You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/24 09:46:06 UTC

[3/6] camel git commit: Use the shard list filtering when obtaining the first shard when using a {AFTER, AT}_SEQUENCE_NUMBER iterator type.

Use the shard list filtering when obtaining the first shard when using a {AFTER,AT}_SEQUENCE_NUMBER iterator type.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0a25e516
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0a25e516
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0a25e516

Branch: refs/heads/master
Commit: 0a25e51634f848d621d11a6c0abb503f4419bff2
Parents: e2b9d91
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 21 13:35:59 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:31 2015 +0100

----------------------------------------------------------------------
 .../aws/ddbstream/DdbStreamConsumer.java        |   8 +-
 .../aws/ddbstream/DdbStreamEndpoint.java        |  38 ++++++-
 .../aws/ddbstream/SequenceNumberProvider.java   |  21 ++++
 .../ddbstream/StaticSequenceNumberProvider.java |  31 ++++++
 .../StringSequenceNumberConverter.java          |  31 ++++++
 .../services/org/apache/camel/TypeConverter     |   1 +
 .../aws/ddbstream/DdbStreamEndpointTest.java    | 104 +++++++++++++++++++
 7 files changed, 229 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index f5223c0..25e5f31 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -108,11 +108,17 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
             LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
             if (currentShard == null) {
                 switch(getEndpoint().getIteratorType()) {
+                case AFTER_SEQUENCE_NUMBER:
+                    currentShard = shardList.afterSeq(getEndpoint().getSequenceNumber());
+                    break;
+                case AT_SEQUENCE_NUMBER:
+                    currentShard = shardList.atSeq(getEndpoint().getSequenceNumber());
+                    break;
                 case TRIM_HORIZON:
                     currentShard = shardList.first();
                     break;
-                default:
                 case LATEST:
+                default:
                     currentShard = shardList.last();
                     break;
                 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
index 543c432..bc12bc6 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -49,7 +49,8 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
     @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream"
             + " to start getting records. Note that using TRIM_HORIZON can cause a"
             + " significant delay before the stream has caught up to real-time."
-            + " Currently only LATEST and TRIM_HORIZON are supported.",
+            + " if {AT,AFTER}_SEQUENCE_NUMBER are used, then a sequenceNumberProvider"
+            + " MUST be supplied.",
             defaultValue = "LATEST")
     private ShardIteratorType iteratorType = ShardIteratorType.LATEST;
     // TODO add the ability to use ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER
@@ -61,6 +62,11 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
     // Note that the shard list needs to have the ability to start at the shard
     // that includes the supplied sequence number
 
+    @UriParam(label = "consumer", description = "Provider for the sequence number when"
+            + " using one of the two ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER"
+            + " iterator types. Can be a registry reference or a literal sequence number.")
+    private SequenceNumberProvider sequenceNumberProvider;
+
     public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) {
         super(uri, component);
         this.tableName = tableName;
@@ -90,13 +96,30 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
         return true;
     }
 
+    public String getSequenceNumber() {
+        switch (getIteratorType()) {
+        case AFTER_SEQUENCE_NUMBER:
+        case AT_SEQUENCE_NUMBER:
+            if (null == getSequenceNumberProvider()) {
+                throw new IllegalStateException("sequenceNumberProvider must be"
+                        + " provided, either as an implementation of"
+                        + " SequenceNumberProvider or a literal String.");
+            } else {
+                return getSequenceNumberProvider().getSequenceNumber();
+            }
+        default:
+            return "";
+        }
+    }
+
     @Override
     public String toString() {
         return "DdbStreamEndpoint{"
                 + "tableName=" + tableName
                 + ", amazonDynamoDbStreamsClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest
-                + ", iteratorType="
-                + iteratorType + ", uri=" + getEndpointUri()
+                + ", iteratorType=" + iteratorType
+                + ", sequenceNumberProvider=" + sequenceNumberProvider
+                + ", uri=" + getEndpointUri()
                 + '}';
     }
 
@@ -135,5 +158,12 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
     public void setIteratorType(ShardIteratorType iteratorType) {
         this.iteratorType = iteratorType;
     }
-    
+
+    public SequenceNumberProvider getSequenceNumberProvider() {
+        return sequenceNumberProvider;
+    }
+
+    public void setSequenceNumberProvider(SequenceNumberProvider sequenceNumberProvider) {
+        this.sequenceNumberProvider = sequenceNumberProvider;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java
new file mode 100644
index 0000000..5a9dd8c
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+public interface SequenceNumberProvider {
+    String getSequenceNumber();
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java
new file mode 100644
index 0000000..459767b
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+public class StaticSequenceNumberProvider implements SequenceNumberProvider {
+
+    private final String sequenceNumber;
+
+    public StaticSequenceNumberProvider(String sequenceNumber) {
+        this.sequenceNumber = sequenceNumber;
+    }
+
+    @Override
+    public String getSequenceNumber() {
+        return sequenceNumber;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
new file mode 100644
index 0000000..92bae2b
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import org.apache.camel.Converter;
+
+@Converter
+public class StringSequenceNumberConverter {
+
+    private StringSequenceNumberConverter() {
+    }
+
+    @Converter
+    public static SequenceNumberProvider toSequenceNumberProvider(String sequenceNumber) {
+        return new StaticSequenceNumberProvider(sequenceNumber);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
index 4873a46..4472b59 100644
--- a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
+++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
@@ -16,3 +16,4 @@
 ## ---------------------------------------------------------------------------
 
 org.apache.camel.component.aws.kinesis.RecordStringConverter
+org.apache.camel.component.aws.ddbstream.StringSequenceNumberConverter

http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
new file mode 100644
index 0000000..9e688be
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DdbStreamEndpointTest {
+
+    private CamelContext context;
+
+    @Mock private SequenceNumberProvider sequenceNumberProvider;
+    @Mock private AmazonDynamoDBStreams amazonDynamoDBStreams;
+
+    @Rule public ExpectedException expectedException = ExpectedException.none();
+
+    @Before
+    public void setup() throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+        registry.put("someSeqNumProv", sequenceNumberProvider);
+        registry.put("ddbStreamsClient", amazonDynamoDBStreams);
+
+        context = new DefaultCamelContext(registry);
+    }
+
+    @Test
+    public void itExtractsTheSequenceNumber() throws Exception {
+        when(sequenceNumberProvider.getSequenceNumber()).thenReturn("seq");
+
+        DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+                + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+                + "&iteratorType=AFTER_SEQUENCE_NUMBER"
+                + "&sequenceNumberProvider=#someSeqNumProv");
+
+        assertThat(undertest.getSequenceNumber(), is("seq"));
+    }
+
+    @Test
+    public void itExtractsTheSequenceNumberFromALiteralString() throws Exception {
+
+        DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+                + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+                + "&iteratorType=AFTER_SEQUENCE_NUMBER"
+                + "&sequenceNumberProvider=seq");
+
+        assertThat(undertest.getSequenceNumber(), is("seq"));
+    }
+
+    @Test
+    public void onSequenceNumberAgnosticIteratorsTheProviderIsIgnored() throws Exception {
+        when(sequenceNumberProvider.getSequenceNumber()).thenReturn("seq");
+
+        DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+                + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+                + "&iteratorType=LATEST"
+                + "&sequenceNumberProvider=#someSeqNumProv");
+
+        assertThat(undertest.getSequenceNumber(), is(""));
+        verify(sequenceNumberProvider, never()).getSequenceNumber();
+    }
+
+    @Test
+    public void sequenceNumberFetchingThrowsSomethingUsefulIfMisconfigurered() throws Exception {
+        when(sequenceNumberProvider.getSequenceNumber()).thenReturn("seq");
+
+        expectedException.expectMessage(containsString("sequenceNumberProvider"));
+
+        DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+                + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+                + "&iteratorType=AT_SEQUENCE_NUMBER"); // NOTE: missing sequence number provider parameter
+
+        undertest.getSequenceNumber();
+    }
+}
\ No newline at end of file