You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/24 09:46:04 UTC

[1/6] camel git commit: Fix CS.

Repository: camel
Updated Branches:
  refs/heads/master 0350423d9 -> 0570a6ca3


Fix CS.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/37040b10
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/37040b10
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/37040b10

Branch: refs/heads/master
Commit: 37040b10adb4821fe656f506495b20e5b79cb3f7
Parents: 0a25e51
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 21 14:05:21 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:31 2015 +0100

----------------------------------------------------------------------
 .../component/aws/ddbstream/ShardList.java      | 22 ++++++-----
 .../StringSequenceNumberConverter.java          |  2 +-
 .../aws/ddbstream/DdbStreamEndpointTest.java    | 14 +++----
 .../ShardListAfterSequenceParametrised.java     | 40 ++++++++++++++------
 .../ShardListAtSequenceParametrised.java        | 40 ++++++++++++++------
 .../component/aws/ddbstream/ShardListTest.java  | 10 ++---
 6 files changed, 82 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/37040b10/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index 3ae1c4e..0a6e332 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -16,20 +16,21 @@
  */
 package org.apache.camel.component.aws.ddbstream;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.amazonaws.services.dynamodbv2.model.Shard;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
+import com.amazonaws.services.dynamodbv2.model.Shard;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class ShardList {
+
     private final Logger log = LoggerFactory.getLogger(ShardList.class);
 
     private final Map<String, Shard> shards = new HashMap<>();
@@ -54,6 +55,8 @@ class ShardList {
     }
 
     Shard first() {
+        // Potential optimisation: if the two provided sequence numbers are the
+        // same then we can skip the shard entirely. Need to confirm this with AWS.
         for (Shard shard : shards.values()) {
             if (!shards.containsKey(shard.getParentShardId())) {
                 return shard;
@@ -99,14 +102,15 @@ class ShardList {
             }
         }
         if (shards.size() > 0) {
-            return sorted.get(sorted.size()-1);
+            return sorted.get(sorted.size() - 1);
         }
         throw new IllegalStateException("Unable to find a shard with appropriate sequence numbers for " + sequenceNumber + " in " + shards);
     }
 
     /**
-     * Removes shards that are older than the provided shard.
-     * Does not remove the provided shard.
+     * Removes shards that are older than the provided shard. Does not remove
+     * the provided shard.
+     *
      * @param removeBefore
      */
     void removeOlderThan(Shard removeBefore) {
@@ -162,4 +166,4 @@ class ShardList {
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/37040b10/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
index 92bae2b..0048ce0 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.aws.ddbstream;
 import org.apache.camel.Converter;
 
 @Converter
-public class StringSequenceNumberConverter {
+public final class StringSequenceNumberConverter {
 
     private StringSequenceNumberConverter() {
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/37040b10/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
index 9e688be..1d3629e 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
@@ -20,30 +20,30 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
 import org.apache.camel.CamelContext;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.SimpleRegistry;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import org.junit.Test;
 import org.junit.Before;
 import org.junit.Rule;
+import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import org.mockito.runners.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
 public class DdbStreamEndpointTest {
+    @Rule public ExpectedException expectedException = ExpectedException.none();
 
     private CamelContext context;
 
     @Mock private SequenceNumberProvider sequenceNumberProvider;
     @Mock private AmazonDynamoDBStreams amazonDynamoDBStreams;
 
-    @Rule public ExpectedException expectedException = ExpectedException.none();
-
     @Before
     public void setup() throws Exception {
         SimpleRegistry registry = new SimpleRegistry();

http://git-wip-us.apache.org/repos/asf/camel/blob/37040b10/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
index c33ebe2..f7e6397 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
@@ -1,18 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.component.aws.ddbstream;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
 @RunWith(Parameterized.class)
 public class ShardListAfterSequenceParametrised {
+    private ShardList undertest;
+
+    private final String inputSequenceNumber;
+    private final String expectedShardId;
 
+    public ShardListAfterSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
+        this.inputSequenceNumber = inputSequenceNumber;
+        this.expectedShardId = expectedShardId;
+    }
 
     @Parameterized.Parameters
     public static Collection<Object[]> paramaters() {
@@ -29,16 +55,6 @@ public class ShardListAfterSequenceParametrised {
         return results;
     }
 
-    private ShardList undertest;
-
-    private final String inputSequenceNumber;
-    private final String expectedShardId;
-
-    public ShardListAfterSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
-        this.inputSequenceNumber = inputSequenceNumber;
-        this.expectedShardId = expectedShardId;
-    }
-
     @Before
     public void setup() throws Exception {
         undertest = new ShardList();

http://git-wip-us.apache.org/repos/asf/camel/blob/37040b10/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
index cf15021..9be5636 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
@@ -1,18 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.component.aws.ddbstream;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
 @RunWith(Parameterized.class)
 public class ShardListAtSequenceParametrised {
+    private ShardList undertest;
+
+    private final String inputSequenceNumber;
+    private final String expectedShardId;
 
+    public ShardListAtSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
+        this.inputSequenceNumber = inputSequenceNumber;
+        this.expectedShardId = expectedShardId;
+    }
 
     @Parameterized.Parameters
     public static Collection<Object[]> paramaters() {
@@ -29,16 +55,6 @@ public class ShardListAtSequenceParametrised {
         return results;
     }
 
-    private ShardList undertest;
-
-    private final String inputSequenceNumber;
-    private final String expectedShardId;
-
-    public ShardListAtSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
-        this.inputSequenceNumber = inputSequenceNumber;
-        this.expectedShardId = expectedShardId;
-    }
-
     @Before
     public void setup() throws Exception {
         undertest = new ShardList();

http://git-wip-us.apache.org/repos/asf/camel/blob/37040b10/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
index e181d7e..68dd53d 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
@@ -17,15 +17,15 @@
 package org.apache.camel.component.aws.ddbstream;
 
 
-import com.amazonaws.services.dynamodbv2.model.SequenceNumberRange;
 import java.util.ArrayList;
 import java.util.List;
-import com.amazonaws.services.dynamodbv2.model.Shard;
 
+import com.amazonaws.services.dynamodbv2.model.SequenceNumberRange;
+import com.amazonaws.services.dynamodbv2.model.Shard;
 import org.junit.Test;
+
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
-import org.junit.Ignore;
 
 public class ShardListTest {
 
@@ -125,8 +125,8 @@ public class ShardListTest {
         List<Shard> result = new ArrayList<>();
         for (int i = 0; i < shardIdsAndSeqNos.length; i += 3) {
             String id = shardIdsAndSeqNos[i];
-            String seqStart = shardIdsAndSeqNos[i+1];
-            String seqEnd = shardIdsAndSeqNos[i+2];
+            String seqStart = shardIdsAndSeqNos[i + 1];
+            String seqEnd = shardIdsAndSeqNos[i + 2];
             result.add(new Shard()
                     .withShardId(id)
                     .withParentShardId(previous)


[4/6] camel git commit: Added tests for the walking of shards.

Posted by da...@apache.org.
Added tests for the walking of shards.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b2430c0a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b2430c0a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b2430c0a

Branch: refs/heads/master
Commit: b2430c0ae4dbbf597288e0a901248b6c9995ac1f
Parents: 37040b1
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 21 16:46:05 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:32 2015 +0100

----------------------------------------------------------------------
 .../aws/ddbstream/DdbStreamConsumerTest.java    | 242 +++++++++++++++++++
 1 file changed, 242 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b2430c0a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
new file mode 100644
index 0000000..3ef6c70
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
@@ -0,0 +1,242 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
+import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
+import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
+import com.amazonaws.services.dynamodbv2.model.Record;
+import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
+import com.amazonaws.services.dynamodbv2.model.Stream;
+import com.amazonaws.services.dynamodbv2.model.StreamDescription;
+import com.amazonaws.services.dynamodbv2.model.StreamRecord;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DdbStreamConsumerTest {
+
+    private DdbStreamConsumer undertest;
+
+    @Mock private AmazonDynamoDBStreams amazonDynamoDBStreams;
+    @Mock private AsyncProcessor processor;
+    private final CamelContext context = new DefaultCamelContext();
+    private final DdbStreamComponent component = new DdbStreamComponent(context);
+    private DdbStreamEndpoint endpoint = new DdbStreamEndpoint(null, "table_name", component);
+
+    private final String[] seqNums = new String[]{"2", "9", "11", "13", "14", "21", "25", "30", "35", "40"};
+
+    @Before
+    public void setup() throws Exception {
+        endpoint.setAmazonDynamoDbStreamsClient(amazonDynamoDBStreams);
+
+        when(amazonDynamoDBStreams.listStreams(any(ListStreamsRequest.class))).thenReturn(
+            new ListStreamsResult()
+                .withStreams(new Stream()
+                        .withStreamArn("arn:aws:dynamodb:region:12345:table/table_name/stream/timestamp")
+                )
+        );
+
+        when(amazonDynamoDBStreams.describeStream(any(DescribeStreamRequest.class))).thenReturn(
+            new DescribeStreamResult()
+                .withStreamDescription(
+                        new StreamDescription()
+                        .withTableName("table_name")
+                        .withShards(
+                                ShardListTest.createShardsWithSequenceNumbers(null,
+                                        "a", "1", "5",
+                                        "b", "8", "15",
+                                        "c", "16", "16",
+                                        "d", "20", null
+                                )
+                        )
+                )
+        );
+
+        when(amazonDynamoDBStreams.getShardIterator(any(GetShardIteratorRequest.class))).thenAnswer(new Answer<GetShardIteratorResult>() {
+            @Override
+            public GetShardIteratorResult answer(InvocationOnMock invocation) throws Throwable {
+                return new GetShardIteratorResult()
+                        .withShardIterator("shard_iterator_"
+                                + ((GetShardIteratorRequest) invocation.getArguments()[0]).getShardId()
+                                + "_000");
+            }
+        });
+
+        final Map<String, String> shardIterators = new HashMap<>();
+        shardIterators.put("shard_iterator_a_000", "shard_iterator_a_001");
+        shardIterators.put("shard_iterator_b_000", "shard_iterator_b_001");
+        shardIterators.put("shard_iterator_b_001", "shard_iterator_b_002");
+        shardIterators.put("shard_iterator_c_000", "shard_iterator_c_001");
+        shardIterators.put("shard_iterator_d_000", "shard_iterator_d_001");
+        final Map<String, Collection<Record>> answers = new HashMap<>();
+        answers.put("shard_iterator_a_001", createRecords("2"));
+        answers.put("shard_iterator_b_000", createRecords("9"));
+        answers.put("shard_iterator_b_001", createRecords("11", "13"));
+        answers.put("shard_iterator_b_002", createRecords("14"));
+        answers.put("shard_iterator_d_000", createRecords("21", "25"));
+        answers.put("shard_iterator_d_001", createRecords("30", "35", "40"));
+        when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class))).thenAnswer(new Answer<GetRecordsResult>() {
+            @Override
+            public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable {
+                final String shardIterator = ((GetRecordsRequest) invocation.getArguments()[0]).getShardIterator();
+                String nextShardIterator = shardIterators.get(shardIterator); // note that HashMap returns null when there is no entry in the map. A null 'nextShardIterator' indicates that the shard has finished and we should move onto the next shard.
+                Matcher m = Pattern.compile("shard_iterator_d_0*(\\d+)").matcher(shardIterator);
+                Collection<Record> ans = answers.get(shardIterator);
+                if (nextShardIterator == null && m.matches()) { // last shard iterates forever.
+                    Integer num = Integer.parseInt(m.group(1));
+                    nextShardIterator = "shard_iterator_d_" + pad(Integer.toString(num+1), 3);
+                }
+                if (null == ans) { // default to an empty list of records.
+                    ans = createRecords();
+                }
+                return new GetRecordsResult()
+                        .withRecords(ans)
+                        .withNextShardIterator(nextShardIterator);
+            }
+        });
+    }
+
+    String pad(String num, int to) {
+        // lazy padding
+        switch (num.length()) {
+        case 1:
+            return "00" + num;
+        case 2:
+            return "0" + num;
+        default:
+            return num;
+        }
+    }
+
+    @Test
+    public void latestOnlyUsesTheLastShard() throws Exception {
+        endpoint.setIteratorType(ShardIteratorType.LATEST);
+        undertest = new DdbStreamConsumer(endpoint, processor);
+
+        undertest.poll();
+
+        ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+        verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
+        assertThat(getIteratorCaptor.getValue().getShardId(), is("d"));
+    }
+
+    @Test
+    public void latestWithTwoPolls() throws Exception {
+        endpoint.setIteratorType(ShardIteratorType.LATEST);
+        undertest = new DdbStreamConsumer(endpoint, processor);
+
+        undertest.poll();
+        undertest.poll();
+
+        ArgumentCaptor<GetRecordsRequest> getRecordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class);
+        verify(amazonDynamoDBStreams, times(2)).getRecords(getRecordsCaptor.capture());
+        assertThat(getRecordsCaptor.getAllValues().get(0).getShardIterator(), is("shard_iterator_d_000"));
+        assertThat(getRecordsCaptor.getAllValues().get(1).getShardIterator(), is("shard_iterator_d_001"));
+    }
+
+    @Test
+    public void trimHorizonStartsWithTheFirstShard() throws Exception {
+        endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON);
+        undertest = new DdbStreamConsumer(endpoint, processor);
+
+        undertest.poll();
+
+        ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+        verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
+        assertThat(getIteratorCaptor.getValue().getShardId(), is("a"));
+    }
+
+    @Test
+    public void trimHorizonWalksAllShards() throws Exception {
+        endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON);
+        undertest = new DdbStreamConsumer(endpoint, processor);
+
+        for (int i = 0; i < 9; ++i) {
+            undertest.poll();
+        }
+
+        ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+        verify(amazonDynamoDBStreams, times(4)).getShardIterator(getIteratorCaptor.capture());
+        assertThat(getIteratorCaptor.getAllValues().get(0).getShardId(), is("a"));
+        assertThat(getIteratorCaptor.getAllValues().get(1).getShardId(), is("b"));
+        assertThat(getIteratorCaptor.getAllValues().get(2).getShardId(), is("c"));
+        assertThat(getIteratorCaptor.getAllValues().get(3).getShardId(), is("d"));
+
+        ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
+        verify(processor, times(seqNums.length)).process(exchangeCaptor.capture(), any(AsyncCallback.class));
+
+        for (int i = 0; i < seqNums.length; ++i) {
+            assertThat(exchangeCaptor.getAllValues().get(i).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is(seqNums[i]));
+        }
+    }
+
+    @Test
+    public void atSeqNumber12StartsWithShardB() throws Exception {
+        endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
+        endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("12"));
+        undertest = new DdbStreamConsumer(endpoint, processor);
+
+        undertest.poll();
+
+        ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+        verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
+        assertThat(getIteratorCaptor.getValue().getShardId(), is("b"));
+    }
+
+    @Test
+    public void afterSeqNumber16StartsWithShardC() throws Exception {
+        endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
+        endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("16"));
+        undertest = new DdbStreamConsumer(endpoint, processor);
+
+        undertest.poll();
+
+        ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+        verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
+        assertThat(getIteratorCaptor.getValue().getShardId(), is("c"));
+    }
+
+    private static Collection<Record> createRecords(String... sequenceNumbers) {
+        List<Record> results = new ArrayList<>();
+
+        for (String seqNum : sequenceNumbers) {
+            results.add(new Record()
+                    .withDynamodb(new StreamRecord().withSequenceNumber(seqNum))
+            );
+        }
+
+        return results;
+    }
+    
+}
\ No newline at end of file


[2/6] camel git commit: Add {at, after}-sequence-number symantics to the shard list container

Posted by da...@apache.org.
Add {at,after}-sequence-number symantics to the shard list container


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e2b9d91c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e2b9d91c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e2b9d91c

Branch: refs/heads/master
Commit: e2b9d91c3428e659c36799466de7abe709c4d941
Parents: 0350423
Author: Candle <ca...@candle.me.uk>
Authored: Wed Dec 16 14:58:02 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:31 2015 +0100

----------------------------------------------------------------------
 .../component/aws/ddbstream/ShardList.java      | 67 ++++++++++++++++++++
 .../ShardListAfterSequenceParametrised.java     | 57 +++++++++++++++++
 .../ShardListAtSequenceParametrised.java        | 57 +++++++++++++++++
 .../component/aws/ddbstream/ShardListTest.java  | 24 ++++++-
 4 files changed, 204 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index a0df179..3ae1c4e 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -21,6 +21,11 @@ import java.util.HashMap;
 import java.util.Map;
 
 import com.amazonaws.services.dynamodbv2.model.Shard;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,6 +75,35 @@ class ShardList {
         throw new IllegalStateException("Unable to find a shard with no children " + shards);
     }
 
+    Shard afterSeq(String sequenceNumber) {
+        return atAfterSeq(sequenceNumber, After.INSTANCE);
+    }
+
+    Shard atSeq(String sequenceNumber) {
+        return atAfterSeq(sequenceNumber, At.INSTANCE);
+    }
+
+    Shard atAfterSeq(String sequenceNumber, AtAfterCondition condition) {
+        BigInteger atAfter = new BigInteger(sequenceNumber);
+        List<Shard> sorted = new ArrayList<>();
+        sorted.addAll(shards.values());
+        Collections.sort(sorted, StartingSequenceNumberComparator.INSTANCE);
+        for (Shard shard : sorted) {
+            if (shard.getSequenceNumberRange().getEndingSequenceNumber() != null) {
+                BigInteger end = new BigInteger(shard.getSequenceNumberRange().getEndingSequenceNumber());
+                // essentially: after < end or after <= end
+                if (condition.matches(atAfter, end)) {
+                    return shard;
+                }
+
+            }
+        }
+        if (shards.size() > 0) {
+            return sorted.get(sorted.size()-1);
+        }
+        throw new IllegalStateException("Unable to find a shard with appropriate sequence numbers for " + sequenceNumber + " in " + shards);
+    }
+
     /**
      * Removes shards that are older than the provided shard.
      * Does not remove the provided shard.
@@ -95,4 +129,37 @@ class ShardList {
     public String toString() {
         return "ShardList{" + "shards=" + shards + '}';
     }
+
+    private interface AtAfterCondition {
+        boolean matches(BigInteger sequenceNumber, BigInteger option);
+    }
+
+    private static enum After implements AtAfterCondition {
+        INSTANCE() {
+            @Override
+            public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) {
+                return providedSequenceNumber.compareTo(shardSequenceNumber) < 0;
+            }
+        }
+    }
+
+    private static enum At implements AtAfterCondition {
+        INSTANCE() {
+            @Override
+            public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) {
+                return providedSequenceNumber.compareTo(shardSequenceNumber) <= 0;
+            }
+        }
+    }
+
+    private static enum StartingSequenceNumberComparator implements Comparator<Shard> {
+        INSTANCE() {
+            @Override
+            public int compare(Shard o1, Shard o2) {
+                BigInteger i1 = new BigInteger(o1.getSequenceNumberRange().getStartingSequenceNumber());
+                BigInteger i2 = new BigInteger(o2.getSequenceNumberRange().getStartingSequenceNumber());
+                return i1.compareTo(i2);
+            }
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
new file mode 100644
index 0000000..c33ebe2
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
@@ -0,0 +1,57 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ShardListAfterSequenceParametrised {
+
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> paramaters() {
+        List<Object[]> results = new ArrayList<>();
+        results.add(new Object[]{"0", "a"});
+        results.add(new Object[]{"3", "a"});
+        results.add(new Object[]{"6", "b"});
+        results.add(new Object[]{"8", "b"});
+        results.add(new Object[]{"15", "c"});
+        results.add(new Object[]{"16", "d"});
+        results.add(new Object[]{"18", "d"});
+        results.add(new Object[]{"25", "d"});
+        results.add(new Object[]{"30", "d"});
+        return results;
+    }
+
+    private ShardList undertest;
+
+    private final String inputSequenceNumber;
+    private final String expectedShardId;
+
+    public ShardListAfterSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
+        this.inputSequenceNumber = inputSequenceNumber;
+        this.expectedShardId = expectedShardId;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        undertest = new ShardList();
+        undertest.addAll(ShardListTest.createShardsWithSequenceNumbers(null,
+                "a", "1", "5",
+                "b", "8", "15",
+                "c", "16", "16",
+                "d", "20", null
+        ));
+    }
+
+    @Test
+    public void assertions() throws Exception {
+        assertThat(undertest.afterSeq(inputSequenceNumber).getShardId(), is(expectedShardId));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
new file mode 100644
index 0000000..cf15021
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
@@ -0,0 +1,57 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ShardListAtSequenceParametrised {
+
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> paramaters() {
+        List<Object[]> results = new ArrayList<>();
+        results.add(new Object[]{"0", "a"});
+        results.add(new Object[]{"3", "a"});
+        results.add(new Object[]{"6", "b"});
+        results.add(new Object[]{"8", "b"});
+        results.add(new Object[]{"15", "b"});
+        results.add(new Object[]{"16", "c"});
+        results.add(new Object[]{"18", "d"});
+        results.add(new Object[]{"25", "d"});
+        results.add(new Object[]{"30", "d"});
+        return results;
+    }
+
+    private ShardList undertest;
+
+    private final String inputSequenceNumber;
+    private final String expectedShardId;
+
+    public ShardListAtSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
+        this.inputSequenceNumber = inputSequenceNumber;
+        this.expectedShardId = expectedShardId;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        undertest = new ShardList();
+        undertest.addAll(ShardListTest.createShardsWithSequenceNumbers(null,
+                "a", "1", "5",
+                "b", "8", "15",
+                "c", "16", "16",
+                "d", "20", null
+        ));
+    }
+
+    @Test
+    public void assertions() throws Exception {
+        assertThat(undertest.atSeq(inputSequenceNumber).getShardId(), is(expectedShardId));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
index 1b7249a..e181d7e 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.aws.ddbstream;
 
 
+import com.amazonaws.services.dynamodbv2.model.SequenceNumberRange;
 import java.util.ArrayList;
 import java.util.List;
 import com.amazonaws.services.dynamodbv2.model.Shard;
@@ -24,6 +25,7 @@ import com.amazonaws.services.dynamodbv2.model.Shard;
 import org.junit.Test;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import org.junit.Ignore;
 
 public class ShardListTest {
 
@@ -118,7 +120,27 @@ public class ShardListTest {
         assertThat(shards.first().getShardId(), is("c"));
     }
 
-    List<Shard> createShards(String initialParent, String... shardIds) {
+    static List<Shard> createShardsWithSequenceNumbers(String initialParent, String... shardIdsAndSeqNos) {
+        String previous = initialParent;
+        List<Shard> result = new ArrayList<>();
+        for (int i = 0; i < shardIdsAndSeqNos.length; i += 3) {
+            String id = shardIdsAndSeqNos[i];
+            String seqStart = shardIdsAndSeqNos[i+1];
+            String seqEnd = shardIdsAndSeqNos[i+2];
+            result.add(new Shard()
+                    .withShardId(id)
+                    .withParentShardId(previous)
+                    .withSequenceNumberRange(new SequenceNumberRange()
+                        .withStartingSequenceNumber(seqStart)
+                        .withEndingSequenceNumber(seqEnd)
+                    )
+            );
+            previous = id;
+        }
+        return result;
+    }
+
+    static List<Shard> createShards(String initialParent, String... shardIds) {
         String previous = initialParent;
         List<Shard> result = new ArrayList<>();
         for (String s : shardIds) {


[3/6] camel git commit: Use the shard list filtering when obtaining the first shard when using a {AFTER, AT}_SEQUENCE_NUMBER iterator type.

Posted by da...@apache.org.
Use the shard list filtering when obtaining the first shard when using a {AFTER,AT}_SEQUENCE_NUMBER iterator type.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0a25e516
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0a25e516
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0a25e516

Branch: refs/heads/master
Commit: 0a25e51634f848d621d11a6c0abb503f4419bff2
Parents: e2b9d91
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 21 13:35:59 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:31 2015 +0100

----------------------------------------------------------------------
 .../aws/ddbstream/DdbStreamConsumer.java        |   8 +-
 .../aws/ddbstream/DdbStreamEndpoint.java        |  38 ++++++-
 .../aws/ddbstream/SequenceNumberProvider.java   |  21 ++++
 .../ddbstream/StaticSequenceNumberProvider.java |  31 ++++++
 .../StringSequenceNumberConverter.java          |  31 ++++++
 .../services/org/apache/camel/TypeConverter     |   1 +
 .../aws/ddbstream/DdbStreamEndpointTest.java    | 104 +++++++++++++++++++
 7 files changed, 229 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index f5223c0..25e5f31 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -108,11 +108,17 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
             LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
             if (currentShard == null) {
                 switch(getEndpoint().getIteratorType()) {
+                case AFTER_SEQUENCE_NUMBER:
+                    currentShard = shardList.afterSeq(getEndpoint().getSequenceNumber());
+                    break;
+                case AT_SEQUENCE_NUMBER:
+                    currentShard = shardList.atSeq(getEndpoint().getSequenceNumber());
+                    break;
                 case TRIM_HORIZON:
                     currentShard = shardList.first();
                     break;
-                default:
                 case LATEST:
+                default:
                     currentShard = shardList.last();
                     break;
                 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
index 543c432..bc12bc6 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -49,7 +49,8 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
     @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream"
             + " to start getting records. Note that using TRIM_HORIZON can cause a"
             + " significant delay before the stream has caught up to real-time."
-            + " Currently only LATEST and TRIM_HORIZON are supported.",
+            + " if {AT,AFTER}_SEQUENCE_NUMBER are used, then a sequenceNumberProvider"
+            + " MUST be supplied.",
             defaultValue = "LATEST")
     private ShardIteratorType iteratorType = ShardIteratorType.LATEST;
     // TODO add the ability to use ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER
@@ -61,6 +62,11 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
     // Note that the shard list needs to have the ability to start at the shard
     // that includes the supplied sequence number
 
+    @UriParam(label = "consumer", description = "Provider for the sequence number when"
+            + " using one of the two ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER"
+            + " iterator types. Can be a registry reference or a literal sequence number.")
+    private SequenceNumberProvider sequenceNumberProvider;
+
     public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) {
         super(uri, component);
         this.tableName = tableName;
@@ -90,13 +96,30 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
         return true;
     }
 
+    public String getSequenceNumber() {
+        switch (getIteratorType()) {
+        case AFTER_SEQUENCE_NUMBER:
+        case AT_SEQUENCE_NUMBER:
+            if (null == getSequenceNumberProvider()) {
+                throw new IllegalStateException("sequenceNumberProvider must be"
+                        + " provided, either as an implementation of"
+                        + " SequenceNumberProvider or a literal String.");
+            } else {
+                return getSequenceNumberProvider().getSequenceNumber();
+            }
+        default:
+            return "";
+        }
+    }
+
     @Override
     public String toString() {
         return "DdbStreamEndpoint{"
                 + "tableName=" + tableName
                 + ", amazonDynamoDbStreamsClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest
-                + ", iteratorType="
-                + iteratorType + ", uri=" + getEndpointUri()
+                + ", iteratorType=" + iteratorType
+                + ", sequenceNumberProvider=" + sequenceNumberProvider
+                + ", uri=" + getEndpointUri()
                 + '}';
     }
 
@@ -135,5 +158,12 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
     public void setIteratorType(ShardIteratorType iteratorType) {
         this.iteratorType = iteratorType;
     }
-    
+
+    public SequenceNumberProvider getSequenceNumberProvider() {
+        return sequenceNumberProvider;
+    }
+
+    public void setSequenceNumberProvider(SequenceNumberProvider sequenceNumberProvider) {
+        this.sequenceNumberProvider = sequenceNumberProvider;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java
new file mode 100644
index 0000000..5a9dd8c
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+public interface SequenceNumberProvider {
+    String getSequenceNumber();
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java
new file mode 100644
index 0000000..459767b
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+public class StaticSequenceNumberProvider implements SequenceNumberProvider {
+
+    private final String sequenceNumber;
+
+    public StaticSequenceNumberProvider(String sequenceNumber) {
+        this.sequenceNumber = sequenceNumber;
+    }
+
+    @Override
+    public String getSequenceNumber() {
+        return sequenceNumber;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
new file mode 100644
index 0000000..92bae2b
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import org.apache.camel.Converter;
+
+@Converter
+public class StringSequenceNumberConverter {
+
+    private StringSequenceNumberConverter() {
+    }
+
+    @Converter
+    public static SequenceNumberProvider toSequenceNumberProvider(String sequenceNumber) {
+        return new StaticSequenceNumberProvider(sequenceNumber);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
index 4873a46..4472b59 100644
--- a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
+++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
@@ -16,3 +16,4 @@
 ## ---------------------------------------------------------------------------
 
 org.apache.camel.component.aws.kinesis.RecordStringConverter
+org.apache.camel.component.aws.ddbstream.StringSequenceNumberConverter

http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
new file mode 100644
index 0000000..9e688be
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DdbStreamEndpointTest {
+
+    private CamelContext context;
+
+    @Mock private SequenceNumberProvider sequenceNumberProvider;
+    @Mock private AmazonDynamoDBStreams amazonDynamoDBStreams;
+
+    @Rule public ExpectedException expectedException = ExpectedException.none();
+
+    @Before
+    public void setup() throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+        registry.put("someSeqNumProv", sequenceNumberProvider);
+        registry.put("ddbStreamsClient", amazonDynamoDBStreams);
+
+        context = new DefaultCamelContext(registry);
+    }
+
+    @Test
+    public void itExtractsTheSequenceNumber() throws Exception {
+        when(sequenceNumberProvider.getSequenceNumber()).thenReturn("seq");
+
+        DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+                + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+                + "&iteratorType=AFTER_SEQUENCE_NUMBER"
+                + "&sequenceNumberProvider=#someSeqNumProv");
+
+        assertThat(undertest.getSequenceNumber(), is("seq"));
+    }
+
+    @Test
+    public void itExtractsTheSequenceNumberFromALiteralString() throws Exception {
+
+        DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+                + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+                + "&iteratorType=AFTER_SEQUENCE_NUMBER"
+                + "&sequenceNumberProvider=seq");
+
+        assertThat(undertest.getSequenceNumber(), is("seq"));
+    }
+
+    @Test
+    public void onSequenceNumberAgnosticIteratorsTheProviderIsIgnored() throws Exception {
+        when(sequenceNumberProvider.getSequenceNumber()).thenReturn("seq");
+
+        DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+                + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+                + "&iteratorType=LATEST"
+                + "&sequenceNumberProvider=#someSeqNumProv");
+
+        assertThat(undertest.getSequenceNumber(), is(""));
+        verify(sequenceNumberProvider, never()).getSequenceNumber();
+    }
+
+    @Test
+    public void sequenceNumberFetchingThrowsSomethingUsefulIfMisconfigurered() throws Exception {
+        when(sequenceNumberProvider.getSequenceNumber()).thenReturn("seq");
+
+        expectedException.expectMessage(containsString("sequenceNumberProvider"));
+
+        DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+                + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+                + "&iteratorType=AT_SEQUENCE_NUMBER"); // NOTE: missing sequence number provider parameter
+
+        undertest.getSequenceNumber();
+    }
+}
\ No newline at end of file


[5/6] camel git commit: Add filtering of the records when asking for an {at, after}_sequence_number iterator type.

Posted by da...@apache.org.
Add filtering of the records when asking for an {at,after}_sequence_number iterator type.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8e05657d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8e05657d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8e05657d

Branch: refs/heads/master
Commit: 8e05657d6a742bb6b8b0026c9683953c38168a66
Parents: b2430c0
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 21 18:29:38 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:32 2015 +0100

----------------------------------------------------------------------
 .../aws/ddbstream/AtAfterCondition.java         | 44 +++++++++++++
 .../aws/ddbstream/DdbStreamConsumer.java        | 45 ++++++++++++-
 .../component/aws/ddbstream/ShardList.java      | 24 +------
 .../aws/ddbstream/AtAfterConditionTest.java     | 67 ++++++++++++++++++++
 .../aws/ddbstream/DdbStreamConsumerTest.java    | 56 +++++++++++++++-
 5 files changed, 212 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
new file mode 100644
index 0000000..a6798b5
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import java.math.BigInteger;
+
+interface AtAfterCondition {
+
+    /**
+     * @return true if sequenceNumber is (at,after) the endpointSequenceNumber.
+     */
+    boolean matches(BigInteger endpointSequenceNumber, BigInteger sequenceNumber);
+
+    static enum Conditions implements AtAfterCondition {
+        AFTER() {
+            @Override
+            public boolean matches(BigInteger endpointSequenceNumber, BigInteger sequenceNumber) {
+                return endpointSequenceNumber.compareTo(sequenceNumber) < 0;
+            }
+        },
+
+        AT() {
+            @Override
+            public boolean matches(BigInteger endpointSequenceNumber, BigInteger sequenceNumber) {
+                return endpointSequenceNumber.compareTo(sequenceNumber) <= 0;
+            }
+        }
+        // TODO rename to LT/LTEQ/EQ/GTEQ/GT
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index 25e5f31..d520abf 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.aws.ddbstream;
 
+import java.math.BigInteger;
 import java.util.ArrayDeque;
 import java.util.List;
 import java.util.Queue;
@@ -31,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
 import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
 import com.amazonaws.services.dynamodbv2.model.Record;
 import com.amazonaws.services.dynamodbv2.model.Shard;
+import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -132,6 +134,30 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
                     .withStreamArn(streamArn)
                     .withShardId(currentShard.getShardId())
                     .withShardIteratorType(getEndpoint().getIteratorType());
+            switch(getEndpoint().getIteratorType()) {
+            case AFTER_SEQUENCE_NUMBER:
+            case AT_SEQUENCE_NUMBER:
+                // if you request with a sequence number that is LESS than the
+                // start of the shard, you get a HTTP 400 from AWS.
+                // So only add the sequence number if the endpoints
+                // sequence number is AT or AFTER the starting sequence for
+                // the shard.
+                // Otherwise change the shart iterator type to trim_horizon
+                // because we get a 400 when we use one of the
+                // {at,after}_sequence_number iterator types and don't supply
+                // a sequence number.
+                if (AtAfterCondition.Conditions.AT.matches(
+                        new BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()),
+                        new BigInteger(getEndpoint().getSequenceNumber())
+                )) {
+                    req = req.withSequenceNumber(getEndpoint().getSequenceNumber())
+                        .withShardIteratorType(getEndpoint().getIteratorType());
+                } else {
+                    req = req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
+                }
+                break;
+            default:
+            }
             GetShardIteratorResult result = getClient().getShardIterator(req);
             currentShardIterator = result.getShardIterator();
         }
@@ -141,8 +167,25 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
 
     private Queue<Exchange> createExchanges(List<Record> records) {
         Queue<Exchange> exchanges = new ArrayDeque<>();
+        AtAfterCondition condition;
+        BigInteger providedSeqNum = null;
+        switch(getEndpoint().getIteratorType()) {
+        case AFTER_SEQUENCE_NUMBER:
+            condition = AtAfterCondition.Conditions.AFTER;
+            providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber());
+            break;
+        case AT_SEQUENCE_NUMBER:
+            condition = AtAfterCondition.Conditions.AT;
+            providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber());
+            break;
+        default:
+            condition = null;
+        }
         for (Record record : records) {
-            exchanges.add(getEndpoint().createExchange(record));
+            BigInteger recordSeqNum = new BigInteger(record.getDynamodb().getSequenceNumber());
+            if (condition == null || condition.matches(providedSeqNum, recordSeqNum)) {
+                exchanges.add(getEndpoint().createExchange(record));
+            }
         }
         return exchanges;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index 0a6e332..5c5bd29 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -79,11 +79,11 @@ class ShardList {
     }
 
     Shard afterSeq(String sequenceNumber) {
-        return atAfterSeq(sequenceNumber, After.INSTANCE);
+        return atAfterSeq(sequenceNumber, AtAfterCondition.Conditions.AFTER);
     }
 
     Shard atSeq(String sequenceNumber) {
-        return atAfterSeq(sequenceNumber, At.INSTANCE);
+        return atAfterSeq(sequenceNumber, AtAfterCondition.Conditions.AT);
     }
 
     Shard atAfterSeq(String sequenceNumber, AtAfterCondition condition) {
@@ -134,27 +134,9 @@ class ShardList {
         return "ShardList{" + "shards=" + shards + '}';
     }
 
-    private interface AtAfterCondition {
-        boolean matches(BigInteger sequenceNumber, BigInteger option);
-    }
 
-    private static enum After implements AtAfterCondition {
-        INSTANCE() {
-            @Override
-            public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) {
-                return providedSequenceNumber.compareTo(shardSequenceNumber) < 0;
-            }
-        }
-    }
 
-    private static enum At implements AtAfterCondition {
-        INSTANCE() {
-            @Override
-            public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) {
-                return providedSequenceNumber.compareTo(shardSequenceNumber) <= 0;
-            }
-        }
-    }
+
 
     private static enum StartingSequenceNumberComparator implements Comparator<Shard> {
         INSTANCE() {

http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
new file mode 100644
index 0000000..53aee40
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+
+@RunWith(Parameterized.class)
+public class AtAfterConditionTest {
+
+
+    private final AtAfterCondition condition;
+    private final int smaller;
+    private final int bigger;
+    private final boolean result;
+
+    public AtAfterConditionTest(AtAfterCondition condition, int smaller, int bigger, boolean result) {
+        this.condition = condition;
+        this.smaller = smaller;
+        this.bigger = bigger;
+        this.result = result;
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> parameters() {
+        List<Object[]> results = new ArrayList<>();
+
+        results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 1, 5, true});
+        results.add(new Object[]{AtAfterCondition.Conditions.AT   , 1, 5, true});
+        results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 1, 1, false});
+        results.add(new Object[]{AtAfterCondition.Conditions.AT   , 1, 1, true});
+        results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 5, 1, false});
+        results.add(new Object[]{AtAfterCondition.Conditions.AT   , 5, 1, false});
+
+        return results;
+    }
+
+    @Test
+    public void test() throws Exception {
+        assertThat(condition.matches(BigInteger.valueOf(smaller), BigInteger.valueOf(bigger)), is(result));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
index 3ef6c70..0cbf9ca 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.component.aws.ddbstream;
 
 import java.util.ArrayList;
@@ -110,12 +126,15 @@ public class DdbStreamConsumerTest {
             @Override
             public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable {
                 final String shardIterator = ((GetRecordsRequest) invocation.getArguments()[0]).getShardIterator();
-                String nextShardIterator = shardIterators.get(shardIterator); // note that HashMap returns null when there is no entry in the map. A null 'nextShardIterator' indicates that the shard has finished and we should move onto the next shard.
+                // note that HashMap returns null when there is no entry in the map.
+                // A null 'nextShardIterator' indicates that the shard has finished
+                // and we should move onto the next shard.
+                String nextShardIterator = shardIterators.get(shardIterator);
                 Matcher m = Pattern.compile("shard_iterator_d_0*(\\d+)").matcher(shardIterator);
                 Collection<Record> ans = answers.get(shardIterator);
                 if (nextShardIterator == null && m.matches()) { // last shard iterates forever.
                     Integer num = Integer.parseInt(m.group(1));
-                    nextShardIterator = "shard_iterator_d_" + pad(Integer.toString(num+1), 3);
+                    nextShardIterator = "shard_iterator_d_" + pad(Integer.toString(num + 1), 3);
                 }
                 if (null == ans) { // default to an empty list of records.
                     ans = createRecords();
@@ -227,6 +246,39 @@ public class DdbStreamConsumerTest {
         assertThat(getIteratorCaptor.getValue().getShardId(), is("c"));
     }
 
+    @Test
+    public void atSeqNumber35GivesFirstRecordWithSeq35() throws Exception {
+        endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
+        endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35"));
+        undertest = new DdbStreamConsumer(endpoint, processor);
+
+        for (int i = 0; i < 10; ++i) { // poll lots.
+            undertest.poll();
+        }
+
+        ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
+        verify(processor, times(2)).process(exchangeCaptor.capture(), any(AsyncCallback.class));
+
+        assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("35"));
+        assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("40"));
+    }
+
+    @Test
+    public void afterSeqNumber35GivesFirstRecordWithSeq40() throws Exception {
+        endpoint.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
+        endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35"));
+        undertest = new DdbStreamConsumer(endpoint, processor);
+
+        for (int i = 0; i < 10; ++i) { // poll lots.
+            undertest.poll();
+        }
+
+        ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
+        verify(processor, times(1)).process(exchangeCaptor.capture(), any(AsyncCallback.class));
+
+        assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("40"));
+    }
+
     private static Collection<Record> createRecords(String... sequenceNumbers) {
         List<Record> results = new ArrayList<>();
 


[6/6] camel git commit: Refactor to improve readability of comparisons

Posted by da...@apache.org.
Refactor to improve readability of comparisons


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0570a6ca
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0570a6ca
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0570a6ca

Branch: refs/heads/master
Commit: 0570a6ca30fa3bf20a3606d91b4e021ecc9c6aa6
Parents: 8e05657
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 21 18:44:33 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:32 2015 +0100

----------------------------------------------------------------------
 .../aws/ddbstream/AtAfterCondition.java         | 44 -------------
 .../aws/ddbstream/BigIntComparisons.java        | 44 +++++++++++++
 .../aws/ddbstream/DdbStreamConsumer.java        | 15 ++---
 .../component/aws/ddbstream/ShardList.java      |  6 +-
 .../aws/ddbstream/AtAfterConditionTest.java     | 67 --------------------
 .../aws/ddbstream/BigIntComparisonsTest.java    | 67 ++++++++++++++++++++
 6 files changed, 121 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0570a6ca/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
deleted file mode 100644
index a6798b5..0000000
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws.ddbstream;
-
-import java.math.BigInteger;
-
-interface AtAfterCondition {
-
-    /**
-     * @return true if sequenceNumber is (at,after) the endpointSequenceNumber.
-     */
-    boolean matches(BigInteger endpointSequenceNumber, BigInteger sequenceNumber);
-
-    static enum Conditions implements AtAfterCondition {
-        AFTER() {
-            @Override
-            public boolean matches(BigInteger endpointSequenceNumber, BigInteger sequenceNumber) {
-                return endpointSequenceNumber.compareTo(sequenceNumber) < 0;
-            }
-        },
-
-        AT() {
-            @Override
-            public boolean matches(BigInteger endpointSequenceNumber, BigInteger sequenceNumber) {
-                return endpointSequenceNumber.compareTo(sequenceNumber) <= 0;
-            }
-        }
-        // TODO rename to LT/LTEQ/EQ/GTEQ/GT
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/0570a6ca/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/BigIntComparisons.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/BigIntComparisons.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/BigIntComparisons.java
new file mode 100644
index 0000000..a06bd0a
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/BigIntComparisons.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import java.math.BigInteger;
+
+interface BigIntComparisons {
+
+    /**
+     * @return true if the first parameter is LT/LTEQ/EQ/GTEQ/GT the second
+     */
+    boolean matches(BigInteger first, BigInteger second);
+
+    static enum Conditions implements BigIntComparisons {
+        LT() {
+            @Override
+            public boolean matches(BigInteger first, BigInteger second) {
+                return first.compareTo(second) < 0;
+            }
+        },
+
+        LTEQ() {
+            @Override
+            public boolean matches(BigInteger first, BigInteger second) {
+                return first.compareTo(second) <= 0;
+            }
+        }
+        // TODO Add EQ/GTEQ/GT as needed, but note that GTEQ == !LT and GT == !LTEQ and EQ == (!LT && !GT)
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0570a6ca/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index d520abf..aa8224f 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -140,18 +140,17 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
                 // if you request with a sequence number that is LESS than the
                 // start of the shard, you get a HTTP 400 from AWS.
                 // So only add the sequence number if the endpoints
-                // sequence number is AT or AFTER the starting sequence for
-                // the shard.
+                // sequence number is less than or equal to the starting
+                // sequence for the shard.
                 // Otherwise change the shart iterator type to trim_horizon
                 // because we get a 400 when we use one of the
                 // {at,after}_sequence_number iterator types and don't supply
                 // a sequence number.
-                if (AtAfterCondition.Conditions.AT.matches(
+                if (BigIntComparisons.Conditions.LTEQ.matches(
                         new BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()),
                         new BigInteger(getEndpoint().getSequenceNumber())
                 )) {
-                    req = req.withSequenceNumber(getEndpoint().getSequenceNumber())
-                        .withShardIteratorType(getEndpoint().getIteratorType());
+                    req = req.withSequenceNumber(getEndpoint().getSequenceNumber());
                 } else {
                     req = req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
                 }
@@ -167,15 +166,15 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
 
     private Queue<Exchange> createExchanges(List<Record> records) {
         Queue<Exchange> exchanges = new ArrayDeque<>();
-        AtAfterCondition condition;
+        BigIntComparisons condition;
         BigInteger providedSeqNum = null;
         switch(getEndpoint().getIteratorType()) {
         case AFTER_SEQUENCE_NUMBER:
-            condition = AtAfterCondition.Conditions.AFTER;
+            condition = BigIntComparisons.Conditions.LT;
             providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber());
             break;
         case AT_SEQUENCE_NUMBER:
-            condition = AtAfterCondition.Conditions.AT;
+            condition = BigIntComparisons.Conditions.LTEQ;
             providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber());
             break;
         default:

http://git-wip-us.apache.org/repos/asf/camel/blob/0570a6ca/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index 5c5bd29..8852ada 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -79,14 +79,14 @@ class ShardList {
     }
 
     Shard afterSeq(String sequenceNumber) {
-        return atAfterSeq(sequenceNumber, AtAfterCondition.Conditions.AFTER);
+        return atAfterSeq(sequenceNumber, BigIntComparisons.Conditions.LT);
     }
 
     Shard atSeq(String sequenceNumber) {
-        return atAfterSeq(sequenceNumber, AtAfterCondition.Conditions.AT);
+        return atAfterSeq(sequenceNumber, BigIntComparisons.Conditions.LTEQ);
     }
 
-    Shard atAfterSeq(String sequenceNumber, AtAfterCondition condition) {
+    Shard atAfterSeq(String sequenceNumber, BigIntComparisons condition) {
         BigInteger atAfter = new BigInteger(sequenceNumber);
         List<Shard> sorted = new ArrayList<>();
         sorted.addAll(shards.values());

http://git-wip-us.apache.org/repos/asf/camel/blob/0570a6ca/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
deleted file mode 100644
index 53aee40..0000000
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws.ddbstream;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-
-@RunWith(Parameterized.class)
-public class AtAfterConditionTest {
-
-
-    private final AtAfterCondition condition;
-    private final int smaller;
-    private final int bigger;
-    private final boolean result;
-
-    public AtAfterConditionTest(AtAfterCondition condition, int smaller, int bigger, boolean result) {
-        this.condition = condition;
-        this.smaller = smaller;
-        this.bigger = bigger;
-        this.result = result;
-    }
-
-    @Parameterized.Parameters
-    public static Collection<Object[]> parameters() {
-        List<Object[]> results = new ArrayList<>();
-
-        results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 1, 5, true});
-        results.add(new Object[]{AtAfterCondition.Conditions.AT   , 1, 5, true});
-        results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 1, 1, false});
-        results.add(new Object[]{AtAfterCondition.Conditions.AT   , 1, 1, true});
-        results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 5, 1, false});
-        results.add(new Object[]{AtAfterCondition.Conditions.AT   , 5, 1, false});
-
-        return results;
-    }
-
-    @Test
-    public void test() throws Exception {
-        assertThat(condition.matches(BigInteger.valueOf(smaller), BigInteger.valueOf(bigger)), is(result));
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/0570a6ca/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/BigIntComparisonsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/BigIntComparisonsTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/BigIntComparisonsTest.java
new file mode 100644
index 0000000..c881d41
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/BigIntComparisonsTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+
+@RunWith(Parameterized.class)
+public class BigIntComparisonsTest {
+
+
+    private final BigIntComparisons condition;
+    private final int smaller;
+    private final int bigger;
+    private final boolean result;
+
+    public BigIntComparisonsTest(BigIntComparisons condition, int smaller, int bigger, boolean result) {
+        this.condition = condition;
+        this.smaller = smaller;
+        this.bigger = bigger;
+        this.result = result;
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> parameters() {
+        List<Object[]> results = new ArrayList<>();
+
+        results.add(new Object[]{BigIntComparisons.Conditions.LT  , 1, 5, true});
+        results.add(new Object[]{BigIntComparisons.Conditions.LTEQ, 1, 5, true});
+        results.add(new Object[]{BigIntComparisons.Conditions.LT  , 1, 1, false});
+        results.add(new Object[]{BigIntComparisons.Conditions.LTEQ, 1, 1, true});
+        results.add(new Object[]{BigIntComparisons.Conditions.LT  , 5, 1, false});
+        results.add(new Object[]{BigIntComparisons.Conditions.LTEQ, 5, 1, false});
+
+        return results;
+    }
+
+    @Test
+    public void test() throws Exception {
+        assertThat(condition.matches(BigInteger.valueOf(smaller), BigInteger.valueOf(bigger)), is(result));
+    }
+
+}
\ No newline at end of file