You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/24 09:46:06 UTC
[3/6] camel git commit: Use the shard list filtering when obtaining
the first shard when using a {AFTER, AT}_SEQUENCE_NUMBER iterator type.
Use the shard list filtering when obtaining the first shard when using a {AFTER,AT}_SEQUENCE_NUMBER iterator type.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0a25e516
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0a25e516
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0a25e516
Branch: refs/heads/master
Commit: 0a25e51634f848d621d11a6c0abb503f4419bff2
Parents: e2b9d91
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 21 13:35:59 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:31 2015 +0100
----------------------------------------------------------------------
.../aws/ddbstream/DdbStreamConsumer.java | 8 +-
.../aws/ddbstream/DdbStreamEndpoint.java | 38 ++++++-
.../aws/ddbstream/SequenceNumberProvider.java | 21 ++++
.../ddbstream/StaticSequenceNumberProvider.java | 31 ++++++
.../StringSequenceNumberConverter.java | 31 ++++++
.../services/org/apache/camel/TypeConverter | 1 +
.../aws/ddbstream/DdbStreamEndpointTest.java | 104 +++++++++++++++++++
7 files changed, 229 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index f5223c0..25e5f31 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -108,11 +108,17 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
if (currentShard == null) {
switch(getEndpoint().getIteratorType()) {
+ case AFTER_SEQUENCE_NUMBER:
+ currentShard = shardList.afterSeq(getEndpoint().getSequenceNumber());
+ break;
+ case AT_SEQUENCE_NUMBER:
+ currentShard = shardList.atSeq(getEndpoint().getSequenceNumber());
+ break;
case TRIM_HORIZON:
currentShard = shardList.first();
break;
- default:
case LATEST:
+ default:
currentShard = shardList.last();
break;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
index 543c432..bc12bc6 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -49,7 +49,8 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
@UriParam(label = "consumer", description = "Defines where in the DynaboDB stream"
+ " to start getting records. Note that using TRIM_HORIZON can cause a"
+ " significant delay before the stream has caught up to real-time."
- + " Currently only LATEST and TRIM_HORIZON are supported.",
+ + " if {AT,AFTER}_SEQUENCE_NUMBER are used, then a sequenceNumberProvider"
+ + " MUST be supplied.",
defaultValue = "LATEST")
private ShardIteratorType iteratorType = ShardIteratorType.LATEST;
// TODO add the ability to use ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER
@@ -61,6 +62,11 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
// Note that the shard list needs to have the ability to start at the shard
// that includes the supplied sequence number
+ @UriParam(label = "consumer", description = "Provider for the sequence number when"
+ + " using one of the two ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER"
+ + " iterator types. Can be a registry reference or a literal sequence number.")
+ private SequenceNumberProvider sequenceNumberProvider;
+
public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) {
super(uri, component);
this.tableName = tableName;
@@ -90,13 +96,30 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
return true;
}
+ public String getSequenceNumber() {
+ switch (getIteratorType()) {
+ case AFTER_SEQUENCE_NUMBER:
+ case AT_SEQUENCE_NUMBER:
+ if (null == getSequenceNumberProvider()) {
+ throw new IllegalStateException("sequenceNumberProvider must be"
+ + " provided, either as an implementation of"
+ + " SequenceNumberProvider or a literal String.");
+ } else {
+ return getSequenceNumberProvider().getSequenceNumber();
+ }
+ default:
+ return "";
+ }
+ }
+
@Override
public String toString() {
return "DdbStreamEndpoint{"
+ "tableName=" + tableName
+ ", amazonDynamoDbStreamsClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest
- + ", iteratorType="
- + iteratorType + ", uri=" + getEndpointUri()
+ + ", iteratorType=" + iteratorType
+ + ", sequenceNumberProvider=" + sequenceNumberProvider
+ + ", uri=" + getEndpointUri()
+ '}';
}
@@ -135,5 +158,12 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
public void setIteratorType(ShardIteratorType iteratorType) {
this.iteratorType = iteratorType;
}
-
+
+ public SequenceNumberProvider getSequenceNumberProvider() {
+ return sequenceNumberProvider;
+ }
+
+ public void setSequenceNumberProvider(SequenceNumberProvider sequenceNumberProvider) {
+ this.sequenceNumberProvider = sequenceNumberProvider;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java
new file mode 100644
index 0000000..5a9dd8c
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+public interface SequenceNumberProvider {
+ String getSequenceNumber();
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java
new file mode 100644
index 0000000..459767b
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+public class StaticSequenceNumberProvider implements SequenceNumberProvider {
+
+ private final String sequenceNumber;
+
+ public StaticSequenceNumberProvider(String sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ @Override
+ public String getSequenceNumber() {
+ return sequenceNumber;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
new file mode 100644
index 0000000..92bae2b
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import org.apache.camel.Converter;
+
+@Converter
+public class StringSequenceNumberConverter {
+
+ private StringSequenceNumberConverter() {
+ }
+
+ @Converter
+ public static SequenceNumberProvider toSequenceNumberProvider(String sequenceNumber) {
+ return new StaticSequenceNumberProvider(sequenceNumber);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
index 4873a46..4472b59 100644
--- a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
+++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
@@ -16,3 +16,4 @@
## ---------------------------------------------------------------------------
org.apache.camel.component.aws.kinesis.RecordStringConverter
+org.apache.camel.component.aws.ddbstream.StringSequenceNumberConverter
http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
new file mode 100644
index 0000000..9e688be
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DdbStreamEndpointTest {
+
+ private CamelContext context;
+
+ @Mock private SequenceNumberProvider sequenceNumberProvider;
+ @Mock private AmazonDynamoDBStreams amazonDynamoDBStreams;
+
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ @Before
+ public void setup() throws Exception {
+ SimpleRegistry registry = new SimpleRegistry();
+ registry.put("someSeqNumProv", sequenceNumberProvider);
+ registry.put("ddbStreamsClient", amazonDynamoDBStreams);
+
+ context = new DefaultCamelContext(registry);
+ }
+
+ @Test
+ public void itExtractsTheSequenceNumber() throws Exception {
+ when(sequenceNumberProvider.getSequenceNumber()).thenReturn("seq");
+
+ DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+ + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+ + "&iteratorType=AFTER_SEQUENCE_NUMBER"
+ + "&sequenceNumberProvider=#someSeqNumProv");
+
+ assertThat(undertest.getSequenceNumber(), is("seq"));
+ }
+
+ @Test
+ public void itExtractsTheSequenceNumberFromALiteralString() throws Exception {
+
+ DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+ + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+ + "&iteratorType=AFTER_SEQUENCE_NUMBER"
+ + "&sequenceNumberProvider=seq");
+
+ assertThat(undertest.getSequenceNumber(), is("seq"));
+ }
+
+ @Test
+ public void onSequenceNumberAgnosticIteratorsTheProviderIsIgnored() throws Exception {
+ when(sequenceNumberProvider.getSequenceNumber()).thenReturn("seq");
+
+ DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+ + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+ + "&iteratorType=LATEST"
+ + "&sequenceNumberProvider=#someSeqNumProv");
+
+ assertThat(undertest.getSequenceNumber(), is(""));
+ verify(sequenceNumberProvider, never()).getSequenceNumber();
+ }
+
+ @Test
+ public void sequenceNumberFetchingThrowsSomethingUsefulIfMisconfigurered() throws Exception {
+ when(sequenceNumberProvider.getSequenceNumber()).thenReturn("seq");
+
+ expectedException.expectMessage(containsString("sequenceNumberProvider"));
+
+ DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+ + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+ + "&iteratorType=AT_SEQUENCE_NUMBER"); // NOTE: missing sequence number provider parameter
+
+ undertest.getSequenceNumber();
+ }
+}
\ No newline at end of file