You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/16 14:22:46 UTC
[1/7] camel git commit: Add a data structure to model the way the
shard list works.
Repository: camel
Updated Branches:
refs/heads/master 3da84a6ba -> e3b86b977
Add a data structure to model the way the shard list works.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8f5136af
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8f5136af
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8f5136af
Branch: refs/heads/master
Commit: 8f5136af0ceb93db47858b6ba96b3388ea19c0e7
Parents: 78fd81e
Author: Candle <ca...@candle.me.uk>
Authored: Fri Dec 11 10:49:56 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:11 2015 +0100
----------------------------------------------------------------------
.../component/aws/ddbstream/ShardList.java | 57 ++++++++++
.../component/aws/ddbstream/ShardListTest.java | 104 +++++++++++++++++++
2 files changed, 161 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/8f5136af/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
new file mode 100644
index 0000000..fb188f4
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -0,0 +1,57 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.model.Shard;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+class ShardList {
+
+ private final Map<String, Shard> shards = new HashMap<>();
+
+ void addAll(Collection<Shard> shards) {
+ for (Shard shard : shards) {
+ add(shard);
+ }
+ }
+
+ void add(Shard shard) {
+ shards.put(shard.getShardId(), shard);
+ }
+
+ Shard nextAfter(Shard previous) {
+ for (Shard shard : shards.values()) {
+ if (previous.getShardId().equals(shard.getParentShardId())) {
+ return shard;
+ }
+ }
+ throw new IllegalStateException("Unable to find the next shard for " + previous + " in " + shards);
+ }
+
+ Shard first() {
+ for (Shard shard : shards.values()) {
+ if (!shards.containsKey(shard.getParentShardId())) {
+ return shard;
+ }
+ }
+ throw new IllegalStateException("Unable to find an unparented shard in " + shards);
+ }
+
+ /**
+ * Removes shards that are older than the provided shard.
+ * Does not remove the provided shard.
+ * @param removeBefore
+ */
+ void removeOlderThan(Shard removeBefore) {
+ String current = removeBefore.getParentShardId();
+
+ while (current != null) {
+ Shard s = shards.remove(current);
+ if (s == null) {
+ current = null;
+ } else {
+ current = s.getParentShardId();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/8f5136af/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
new file mode 100644
index 0000000..773f4a0
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
@@ -0,0 +1,104 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.model.Shard;
+import java.util.ArrayList;
+import java.util.List;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Test;
+
+public class ShardListTest {
+
+ @Test
+ public void nextReturnsShardWithParent() throws Exception {
+ Shard first = new Shard()
+ .withShardId("first_shard")
+ .withParentShardId("other_shard_id");
+ Shard second = new Shard()
+ .withParentShardId("first_shard")
+ .withShardId("second_shard");
+
+ ShardList shards = new ShardList();
+ shards.add(first);
+ shards.add(second);
+
+ assertThat(shards.nextAfter(first), is(second));
+ }
+
+ @Test
+ public void nextWithNullReturnsFirstKnownShard() throws Exception {
+ Shard first = new Shard()
+ .withShardId("first_shard");
+ Shard second = new Shard()
+ .withParentShardId("first_shard")
+ .withShardId("second_shard");
+
+ ShardList shards = new ShardList();
+ shards.add(first);
+ shards.add(second);
+
+ assertThat(shards.nextAfter(first), is(second));
+ }
+
+ @Test
+ public void reAddingEntriesMaintainsOrder() throws Exception {
+ Shard first = new Shard()
+ .withShardId("first_shard");
+ Shard second = new Shard()
+ .withParentShardId("first_shard")
+ .withShardId("second_shard");
+
+ ShardList shards = new ShardList();
+ shards.add(first);
+ shards.add(second);
+
+ assertThat(shards.nextAfter(first), is(second));
+
+ Shard second2 = new Shard()
+ .withParentShardId("first_shard")
+ .withShardId("second_shard");
+ Shard third = new Shard()
+ .withParentShardId("second_shard")
+ .withShardId("third_shard");
+ shards.add(second2);
+ shards.add(third);
+
+ assertThat(shards.nextAfter(first), is(second));
+ assertThat(shards.nextAfter(second), is(third));
+ }
+
+ @Test
+ public void firstShardGetsTheFirstWithoutAParent() throws Exception {
+ ShardList shards = new ShardList();
+ shards.addAll(createShards(null, "a", "b", "c", "d"));
+
+ assertThat(shards.first().getShardId(), is("a"));
+ }
+
+ @Test
+ public void firstShardGetsTheFirstWithAnUnknownParent() throws Exception {
+ ShardList shards = new ShardList();
+ shards.addAll(createShards("a", "b", "c", "d"));
+
+ assertThat(shards.first().getShardId(), is("b"));
+ }
+
+ @Test
+ public void removingShards() throws Exception {
+ ShardList shards = new ShardList();
+ shards.addAll(createShards(null, "a", "b", "c", "d"));
+ Shard removeBefore = new Shard().withShardId("c").withParentShardId("b");
+ shards.removeOlderThan(removeBefore);
+ assertThat(shards.first().getShardId(), is("c"));
+ }
+
+ List<Shard> createShards(String initialParent, String... shardIds) {
+ String previous = initialParent;
+ List<Shard> result = new ArrayList<>();
+ for (String s : shardIds) {
+ result.add(new Shard().withShardId(s).withParentShardId(previous));
+ previous = s;
+ }
+ return result;
+ }
+}
\ No newline at end of file
[2/7] camel git commit: Added basic DynamoDb Stream component.
Posted by da...@apache.org.
Added basic DynamoDb Stream component.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/78fd81e5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/78fd81e5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/78fd81e5
Branch: refs/heads/master
Commit: 78fd81e5f7861e6fbde3fb9d63519bc1e775c93c
Parents: 3da84a6
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 7 09:33:10 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:11 2015 +0100
----------------------------------------------------------------------
.../aws/ddbstream/DdbStreamComponent.java | 44 +++++++
.../aws/ddbstream/DdbStreamConsumer.java | 128 +++++++++++++++++++
.../aws/ddbstream/DdbStreamEndpoint.java | 117 +++++++++++++++++
.../org/apache/camel/component/aws-ddbstream | 18 +++
4 files changed, 307 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java
new file mode 100644
index 0000000..559597b
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import java.util.Map;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DdbStreamComponent extends UriEndpointComponent {
+ private static final Logger LOG = LoggerFactory.getLogger(DdbStreamComponent.class);
+
+ public DdbStreamComponent() {
+ super(DdbStreamEndpoint.class);
+ }
+
+ public DdbStreamComponent(CamelContext context) {
+ super(context, DdbStreamEndpoint.class);
+ }
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ DdbStreamEndpoint endpoint = new DdbStreamEndpoint(uri, remaining, this);
+
+ LOG.debug("Created endpoint: {}", endpoint.toString());
+ return endpoint;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
new file mode 100644
index 0000000..88d2ba5
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
+import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
+import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
+import com.amazonaws.services.dynamodbv2.model.Record;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
+ private static final Logger LOG = LoggerFactory.getLogger(DdbStreamConsumer.class);
+
+ private String currentShardIterator = null;
+
+ public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+ }
+
+ /*
+ * Returns the number of messages polled.
+ */
+ @Override
+ protected int poll() throws Exception {
+ GetRecordsRequest req = new GetRecordsRequest()
+ .withShardIterator(getShardItertor())
+ .withLimit(getEndpoint().getMaxResultsPerRequest())
+ ;
+ GetRecordsResult result = getClient().getRecords(req);
+
+ Queue<Exchange> exchanges = createExchanges(result.getRecords());
+ int processedExchangeCount = processBatch(CastUtils.cast(exchanges));
+
+ currentShardIterator = result.getNextShardIterator();
+
+ return processedExchangeCount;
+ }
+
+ @Override
+ public int processBatch(Queue<Object> exchanges) throws Exception {
+ int processedExchanges = 0;
+ while (!exchanges.isEmpty()) {
+ final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+
+ LOG.trace("Processing exchange [{}] started.", exchange);
+ getAsyncProcessor().process(exchange, new AsyncCallback() {
+ @Override
+ public void done(boolean doneSync) {
+ LOG.trace("Processing exchange [{}] done.", exchange);
+ }
+ });
+ processedExchanges++;
+ }
+ return processedExchanges;
+ }
+
+ private AmazonDynamoDBStreams getClient() {
+ return getEndpoint().getClient();
+ }
+
+ @Override
+ public DdbStreamEndpoint getEndpoint() {
+ return (DdbStreamEndpoint) super.getEndpoint();
+ }
+
+ private String getShardItertor() {
+ // either return a cached one or get a new one via a GetShardIterator request.
+ if (currentShardIterator == null) {
+ ListStreamsRequest req0 = new ListStreamsRequest()
+ .withTableName(getEndpoint().getTableName())
+ ;
+ ListStreamsResult res0 = getClient().listStreams(req0);
+ final String streamArn = res0.getStreams().get(0).getStreamArn(); // XXX assumes there is only one stream
+ DescribeStreamRequest req1 = new DescribeStreamRequest()
+ .withStreamArn(streamArn)
+ ;
+ DescribeStreamResult res1 = getClient().describeStream(req1);
+
+ GetShardIteratorRequest req = new GetShardIteratorRequest()
+ .withStreamArn(streamArn)
+ .withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // XXX only uses the first shard
+ .withShardIteratorType(getEndpoint().getIteratorType())
+ ;
+ GetShardIteratorResult result = getClient().getShardIterator(req);
+ currentShardIterator = result.getShardIterator();
+ }
+ LOG.trace("Shard Iterator is: {}", currentShardIterator);
+ return currentShardIterator;
+ }
+
+ private Queue<Exchange> createExchanges(List<Record> records) {
+ Queue<Exchange> exchanges = new ArrayDeque<>();
+ for (Record record : records) {
+ exchanges.add(getEndpoint().createExchange(record));
+ }
+ return exchanges;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
new file mode 100644
index 0000000..18c042d
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.model.Record;
+import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.ScheduledPollEndpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+
+@UriEndpoint(scheme = "aws-ddbstream", title = "AWS Kinesis", syntax = "aws-ddbstream:tableName", consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams")
+public class DdbStreamEndpoint extends ScheduledPollEndpoint {
+
+ @UriPath(label = "consumer,producer", description = "Name of the dynamodb table")
+ @Metadata(required = "true")
+ private String tableName;
+
+ // For now, always assume that we've been supplied a client in the Camel registry.
+ @UriParam(label = "consumer", description = "Amazon DynamoDB client to use for all requests for this endpoint")
+ @Metadata(required = "true")
+ private AmazonDynamoDBStreams amazonDynamoDbStreamsClient;
+
+ @UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll")
+ private int maxResultsPerRequest = 1;
+
+ @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream to start getting records")
+ private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON;
+
+ public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) {
+ super(uri, component);
+ this.tableName = tableName;
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ DdbStreamConsumer consumer = new DdbStreamConsumer(this, processor);
+ consumer.setSchedulerProperties(consumer.getEndpoint().getSchedulerProperties());
+ return consumer;
+ }
+
+ Exchange createExchange(Record record) {
+ Exchange ex = super.createExchange();
+ ex.getIn().setBody(record, Record.class);
+
+ return ex;
+ }
+
+ @Override
+ public boolean isSingleton() {
+ // probably right.
+ return true;
+ }
+
+ AmazonDynamoDBStreams getClient() {
+ return amazonDynamoDbStreamsClient;
+ }
+
+ // required for injection.
+ public AmazonDynamoDBStreams getAmazonDynamoDBStreamsClient() {
+ return amazonDynamoDbStreamsClient;
+ }
+
+ public void setAmazonDynamoDbStreamsClient(AmazonDynamoDBStreams amazonDynamoDbStreamsClient) {
+ this.amazonDynamoDbStreamsClient = amazonDynamoDbStreamsClient;
+ }
+
+ public int getMaxResultsPerRequest() {
+ return maxResultsPerRequest;
+ }
+
+ public void setMaxResultsPerRequest(int maxResultsPerRequest) {
+ this.maxResultsPerRequest = maxResultsPerRequest;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public ShardIteratorType getIteratorType() {
+ return iteratorType;
+ }
+
+ public void setIteratorType(ShardIteratorType iteratorType) {
+ this.iteratorType = iteratorType;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream
new file mode 100644
index 0000000..48a8509
--- /dev/null
+++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+class=org.apache.camel.component.aws.ddbstream.DdbStreamComponent
[6/7] camel git commit: Add an understanding of the shard list to the
main dynamodbstream consumer.
Posted by da...@apache.org.
Add an understanding of the shard list to the main dynamodbstream consumer.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ceaef105
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ceaef105
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ceaef105
Branch: refs/heads/master
Commit: ceaef105323d031ef92751d2744ad7a0f0be2c08
Parents: fdadd15
Author: Candle <ca...@candle.me.uk>
Authored: Fri Dec 11 16:25:48 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:12 2015 +0100
----------------------------------------------------------------------
.../component/aws/ddbstream/DdbStreamConsumer.java | 15 ++++++++++++++-
.../camel/component/aws/ddbstream/ShardList.java | 11 +++++++++++
2 files changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ceaef105/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index 88d2ba5..d4c9ac7 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -26,6 +26,7 @@ import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
import com.amazonaws.services.dynamodbv2.model.Record;
+import com.amazonaws.services.dynamodbv2.model.Shard;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
@@ -42,6 +43,8 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
private static final Logger LOG = LoggerFactory.getLogger(DdbStreamConsumer.class);
private String currentShardIterator = null;
+ private Shard currentShard;
+ private ShardList shardList = new ShardList();
public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -105,10 +108,20 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
.withStreamArn(streamArn)
;
DescribeStreamResult res1 = getClient().describeStream(req1);
+ shardList.addAll(res1.getStreamDescription().getShards());
+
+ LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
+ if (currentShard == null) {
+ currentShard = shardList.first();
+ } else {
+ currentShard = shardList.nextAfter(currentShard);
+ }
+ shardList.removeOlderThan(currentShard);
+ LOG.trace("Next shard is: {} (in {})", currentShard, shardList);
GetShardIteratorRequest req = new GetShardIteratorRequest()
.withStreamArn(streamArn)
- .withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // XXX only uses the first shard
+ .withShardId(currentShard.getShardId())
.withShardIteratorType(getEndpoint().getIteratorType())
;
GetShardIteratorResult result = getClient().getShardIterator(req);
http://git-wip-us.apache.org/repos/asf/camel/blob/ceaef105/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index fb188f4..a85e703 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -4,8 +4,11 @@ import com.amazonaws.services.dynamodbv2.model.Shard;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class ShardList {
+ private final Logger LOG = LoggerFactory.getLogger(ShardList.class);
private final Map<String, Shard> shards = new HashMap<>();
@@ -45,13 +48,21 @@ class ShardList {
void removeOlderThan(Shard removeBefore) {
String current = removeBefore.getParentShardId();
+ int removedShards = 0;
while (current != null) {
Shard s = shards.remove(current);
if (s == null) {
current = null;
} else {
+ removedShards++;
current = s.getParentShardId();
}
}
+ LOG.trace("removed {} shards from the store, new size is {}", removedShards, shards.size());
+ }
+
+ @Override
+ public String toString() {
+ return "ShardList{" + "shards=" + shards + '}';
}
}
\ No newline at end of file
[4/7] camel git commit: Added more documentation.
Posted by da...@apache.org.
Added more documentation.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/71808938
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/71808938
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/71808938
Branch: refs/heads/master
Commit: 7180893861195c2971f046f1a4b403a80c27ec28
Parents: 51e251c
Author: Candle <ca...@candle.me.uk>
Authored: Wed Dec 16 09:05:00 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:12 2015 +0100
----------------------------------------------------------------------
.../component/aws/ddbstream/DdbStreamEndpoint.java | 16 +++++++++++++---
1 file changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/71808938/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
index dfcc6cc..66a7461 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -29,10 +29,10 @@ import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
-@UriEndpoint(scheme = "aws-ddbstream", title = "AWS Kinesis", syntax = "aws-ddbstream:tableName", consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams")
+@UriEndpoint(scheme = "aws-ddbstream", title = "AWS DynamoDB Streams", consumerOnly = true, syntax = "aws-ddbstream:tableName", consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams")
public class DdbStreamEndpoint extends ScheduledPollEndpoint {
- @UriPath(label = "consumer,producer", description = "Name of the dynamodb table")
+ @UriPath(label = "consumer", description = "Name of the dynamodb table")
@Metadata(required = "true")
private String tableName;
@@ -44,8 +44,18 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
@UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll")
private int maxResultsPerRequest = 100;
- @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream to start getting records", defaultValue = "LATEST")
+ @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream"
+ + " to start getting records. Note that using TRIM_HORIZON can cause a"
+ + " significant delay before the stream has caught up to real-time."
+ + " Currently only LATEST and TRIM_HORIZON are supported.",
+ defaultValue = "LATEST")
private ShardIteratorType iteratorType = ShardIteratorType.LATEST;
+ // TODO add the ability to use ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER
+ // by specifying either a sequence number itself or a bean to fetch the
+ // sequence number from persistant storage or somewhere else.
+ // This can be done by having the type of the parameter an interface
+ // and supplying a default implementation and a converter from a long/String
+ // to an instance of this interface.
public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) {
super(uri, component);
[5/7] camel git commit: Update defaults to more sensible defaults.
Posted by da...@apache.org.
Update defaults to more sensible defaults.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fdadd159
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fdadd159
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fdadd159
Branch: refs/heads/master
Commit: fdadd1594b26c63ca22c67044b2fb9fdadd58c5a
Parents: 8f5136a
Author: Candle <ca...@candle.me.uk>
Authored: Fri Dec 11 16:25:19 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:12 2015 +0100
----------------------------------------------------------------------
.../camel/component/aws/ddbstream/DdbStreamEndpoint.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/fdadd159/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
index 18c042d..35812ce 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -42,10 +42,10 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
private AmazonDynamoDBStreams amazonDynamoDbStreamsClient;
@UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll")
- private int maxResultsPerRequest = 1;
+ private int maxResultsPerRequest = 100;
- @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream to start getting records")
- private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON;
+ @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream to start getting records", defaultValue = "LATEST")
+ private ShardIteratorType iteratorType = ShardIteratorType.LATEST;
public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) {
super(uri, component);
[7/7] camel git commit: Correctly support the LATEST shard iterator
type by starting with the last shard in the stream descrption.
Posted by da...@apache.org.
Correctly support the LATEST shard iterator type by starting with the last shard in the stream descrption.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e3b86b97
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e3b86b97
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e3b86b97
Branch: refs/heads/master
Commit: e3b86b977bbcda16d35c936dab77ed7b07f90e6e
Parents: 7180893
Author: Candle <ca...@candle.me.uk>
Authored: Wed Dec 16 10:34:21 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:13 2015 +0100
----------------------------------------------------------------------
.../component/aws/ddbstream/DdbStreamConsumer.java | 10 +++++++++-
.../component/aws/ddbstream/DdbStreamEndpoint.java | 16 +++++++++++++++-
.../camel/component/aws/ddbstream/ShardList.java | 13 +++++++++++++
.../component/aws/ddbstream/ShardListTest.java | 8 ++++++++
4 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index 0a6a83c..f5223c0 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -107,7 +107,15 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
if (currentShard == null) {
- currentShard = shardList.first();
+ switch(getEndpoint().getIteratorType()) {
+ case TRIM_HORIZON:
+ currentShard = shardList.first();
+ break;
+ default:
+ case LATEST:
+ currentShard = shardList.last();
+ break;
+ }
} else {
currentShard = shardList.nextAfter(currentShard);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
index 66a7461..543c432 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -29,7 +29,9 @@ import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
-@UriEndpoint(scheme = "aws-ddbstream", title = "AWS DynamoDB Streams", consumerOnly = true, syntax = "aws-ddbstream:tableName", consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams")
+@UriEndpoint(scheme = "aws-ddbstream", title = "AWS DynamoDB Streams",
+ consumerOnly = true, syntax = "aws-ddbstream:tableName",
+ consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams")
public class DdbStreamEndpoint extends ScheduledPollEndpoint {
@UriPath(label = "consumer", description = "Name of the dynamodb table")
@@ -56,6 +58,8 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
// This can be done by having the type of the parameter an interface
// and supplying a default implementation and a converter from a long/String
// to an instance of this interface.
+ // Note that the shard list needs to have the ability to start at the shard
+ // that includes the supplied sequence number
public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) {
super(uri, component);
@@ -86,6 +90,16 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
return true;
}
+ @Override
+ public String toString() {
+ return "DdbStreamEndpoint{"
+ + "tableName=" + tableName
+ + ", amazonDynamoDbStreamsClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest
+ + ", iteratorType="
+ + iteratorType + ", uri=" + getEndpointUri()
+ + '}';
+ }
+
AmazonDynamoDBStreams getClient() {
return amazonDynamoDbStreamsClient;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index 6e804f5..a0df179 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -57,6 +57,19 @@ class ShardList {
throw new IllegalStateException("Unable to find an unparented shard in " + shards);
}
+ Shard last() {
+ Map<String, Shard> shardsByParent = new HashMap<>();
+ for (Shard shard : shards.values()) {
+ shardsByParent.put(shard.getParentShardId(), shard);
+ }
+ for (Shard shard : shards.values()) {
+ if (!shardsByParent.containsKey(shard.getShardId())) {
+ return shard;
+ }
+ }
+ throw new IllegalStateException("Unable to find a shard with no children " + shards);
+ }
+
/**
* Removes shards that are older than the provided shard.
* Does not remove the provided shard.
http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
index 60f3d46..1b7249a 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
@@ -102,6 +102,14 @@ public class ShardListTest {
}
@Test
+ public void lastShardGetsTheShardWithNoChildren() throws Exception {
+ ShardList shards = new ShardList();
+ shards.addAll(createShards("a", "b", "c", "d"));
+
+ assertThat(shards.last().getShardId(), is("d"));
+ }
+
+ @Test
public void removingShards() throws Exception {
ShardList shards = new ShardList();
shards.addAll(createShards(null, "a", "b", "c", "d"));
[3/7] camel git commit: Fixed CS.
Posted by da...@apache.org.
Fixed CS.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/51e251c7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/51e251c7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/51e251c7
Branch: refs/heads/master
Commit: 51e251c700dfd1bdb0f443d9e42d1281890338dd
Parents: ceaef10
Author: Candle <ca...@candle.me.uk>
Authored: Tue Dec 15 18:22:20 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:12 2015 +0100
----------------------------------------------------------------------
.../aws/ddbstream/DdbStreamConsumer.java | 26 ++++++++------------
.../aws/ddbstream/DdbStreamEndpoint.java | 2 --
.../component/aws/ddbstream/ShardList.java | 23 ++++++++++++++---
.../component/aws/ddbstream/ShardListTest.java | 22 +++++++++++++++--
4 files changed, 50 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/51e251c7/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index d4c9ac7..0a6a83c 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -16,6 +16,10 @@
*/
package org.apache.camel.component.aws.ddbstream;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
@@ -27,9 +31,6 @@ import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
import com.amazonaws.services.dynamodbv2.model.Record;
import com.amazonaws.services.dynamodbv2.model.Shard;
-import java.util.ArrayDeque;
-import java.util.List;
-import java.util.Queue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -42,23 +43,19 @@ import org.slf4j.LoggerFactory;
public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
private static final Logger LOG = LoggerFactory.getLogger(DdbStreamConsumer.class);
- private String currentShardIterator = null;
+ private String currentShardIterator;
private Shard currentShard;
- private ShardList shardList = new ShardList();
+ private final ShardList shardList = new ShardList();
public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) {
super(endpoint, processor);
}
- /*
- * Returns the number of messages polled.
- */
@Override
protected int poll() throws Exception {
GetRecordsRequest req = new GetRecordsRequest()
.withShardIterator(getShardItertor())
- .withLimit(getEndpoint().getMaxResultsPerRequest())
- ;
+ .withLimit(getEndpoint().getMaxResultsPerRequest());
GetRecordsResult result = getClient().getRecords(req);
Queue<Exchange> exchanges = createExchanges(result.getRecords());
@@ -100,13 +97,11 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
// either return a cached one or get a new one via a GetShardIterator request.
if (currentShardIterator == null) {
ListStreamsRequest req0 = new ListStreamsRequest()
- .withTableName(getEndpoint().getTableName())
- ;
+ .withTableName(getEndpoint().getTableName());
ListStreamsResult res0 = getClient().listStreams(req0);
final String streamArn = res0.getStreams().get(0).getStreamArn(); // XXX assumes there is only one stream
DescribeStreamRequest req1 = new DescribeStreamRequest()
- .withStreamArn(streamArn)
- ;
+ .withStreamArn(streamArn);
DescribeStreamResult res1 = getClient().describeStream(req1);
shardList.addAll(res1.getStreamDescription().getShards());
@@ -122,8 +117,7 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
GetShardIteratorRequest req = new GetShardIteratorRequest()
.withStreamArn(streamArn)
.withShardId(currentShard.getShardId())
- .withShardIteratorType(getEndpoint().getIteratorType())
- ;
+ .withShardIteratorType(getEndpoint().getIteratorType());
GetShardIteratorResult result = getClient().getShardIterator(req);
currentShardIterator = result.getShardIterator();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/51e251c7/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
index 35812ce..dfcc6cc 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -73,7 +73,6 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
@Override
public boolean isSingleton() {
- // probably right.
return true;
}
@@ -81,7 +80,6 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
return amazonDynamoDbStreamsClient;
}
- // required for injection.
public AmazonDynamoDBStreams getAmazonDynamoDBStreamsClient() {
return amazonDynamoDbStreamsClient;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/51e251c7/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index a85e703..6e804f5 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -1,14 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.component.aws.ddbstream;
-import com.amazonaws.services.dynamodbv2.model.Shard;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+
+import com.amazonaws.services.dynamodbv2.model.Shard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class ShardList {
- private final Logger LOG = LoggerFactory.getLogger(ShardList.class);
+ private final Logger log = LoggerFactory.getLogger(ShardList.class);
private final Map<String, Shard> shards = new HashMap<>();
@@ -58,7 +75,7 @@ class ShardList {
current = s.getParentShardId();
}
}
- LOG.trace("removed {} shards from the store, new size is {}", removedShards, shards.size());
+ log.trace("removed {} shards from the store, new size is {}", removedShards, shards.size());
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/51e251c7/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
index 773f4a0..60f3d46 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
@@ -1,11 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.component.aws.ddbstream;
-import com.amazonaws.services.dynamodbv2.model.Shard;
+
import java.util.ArrayList;
import java.util.List;
+import com.amazonaws.services.dynamodbv2.model.Shard;
+
+import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-import org.junit.Test;
public class ShardListTest {