You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/16 14:22:46 UTC

[1/7] camel git commit: Add a data structure to model the way the shard list works.

Repository: camel
Updated Branches:
  refs/heads/master 3da84a6ba -> e3b86b977


Add a data structure to model the way the shard list works.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8f5136af
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8f5136af
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8f5136af

Branch: refs/heads/master
Commit: 8f5136af0ceb93db47858b6ba96b3388ea19c0e7
Parents: 78fd81e
Author: Candle <ca...@candle.me.uk>
Authored: Fri Dec 11 10:49:56 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:11 2015 +0100

----------------------------------------------------------------------
 .../component/aws/ddbstream/ShardList.java      |  57 ++++++++++
 .../component/aws/ddbstream/ShardListTest.java  | 104 +++++++++++++++++++
 2 files changed, 161 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8f5136af/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
new file mode 100644
index 0000000..fb188f4
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -0,0 +1,57 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.model.Shard;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+class ShardList {
+
+    private final Map<String, Shard> shards = new HashMap<>();
+
+    void addAll(Collection<Shard> shards) {
+        for (Shard shard : shards) {
+            add(shard);
+        }
+    }
+
+    void add(Shard shard) {
+        shards.put(shard.getShardId(), shard);
+    }
+
+    Shard nextAfter(Shard previous) {
+        for (Shard shard : shards.values()) {
+            if (previous.getShardId().equals(shard.getParentShardId())) {
+                return shard;
+            }
+        }
+        throw new IllegalStateException("Unable to find the next shard for " + previous + " in " + shards);
+    }
+
+    Shard first() {
+        for (Shard shard : shards.values()) {
+            if (!shards.containsKey(shard.getParentShardId())) {
+                return shard;
+            }
+        }
+        throw new IllegalStateException("Unable to find an unparented shard in " + shards);
+    }
+
+    /**
+     * Removes shards that are older than the provided shard.
+     * Does not remove the provided shard.
+     * @param removeBefore
+     */
+    void removeOlderThan(Shard removeBefore) {
+        String current = removeBefore.getParentShardId();
+
+        while (current != null) {
+            Shard s = shards.remove(current);
+            if (s == null) {
+                current = null;
+            } else {
+                current = s.getParentShardId();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/8f5136af/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
new file mode 100644
index 0000000..773f4a0
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
@@ -0,0 +1,104 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.model.Shard;
+import java.util.ArrayList;
+import java.util.List;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Test;
+
+public class ShardListTest {
+
+    @Test
+    public void nextReturnsShardWithParent() throws Exception {
+        Shard first = new Shard()
+                .withShardId("first_shard")
+                .withParentShardId("other_shard_id");
+        Shard second = new Shard()
+                .withParentShardId("first_shard")
+                .withShardId("second_shard");
+
+        ShardList shards = new ShardList();
+        shards.add(first);
+        shards.add(second);
+
+        assertThat(shards.nextAfter(first), is(second));
+    }
+
+    @Test
+    public void nextWithNullReturnsFirstKnownShard() throws Exception {
+        Shard first = new Shard()
+                .withShardId("first_shard");
+        Shard second = new Shard()
+                .withParentShardId("first_shard")
+                .withShardId("second_shard");
+
+        ShardList shards = new ShardList();
+        shards.add(first);
+        shards.add(second);
+
+        assertThat(shards.nextAfter(first), is(second));
+    }
+
+    @Test
+    public void reAddingEntriesMaintainsOrder() throws Exception {
+        Shard first = new Shard()
+                .withShardId("first_shard");
+        Shard second = new Shard()
+                .withParentShardId("first_shard")
+                .withShardId("second_shard");
+
+        ShardList shards = new ShardList();
+        shards.add(first);
+        shards.add(second);
+
+        assertThat(shards.nextAfter(first), is(second));
+
+        Shard second2 = new Shard()
+                .withParentShardId("first_shard")
+                .withShardId("second_shard");
+        Shard third = new Shard()
+                .withParentShardId("second_shard")
+                .withShardId("third_shard");
+        shards.add(second2);
+        shards.add(third);
+
+        assertThat(shards.nextAfter(first), is(second));
+        assertThat(shards.nextAfter(second), is(third));
+    }
+
+    @Test
+    public void firstShardGetsTheFirstWithoutAParent() throws Exception {
+        ShardList shards = new ShardList();
+        shards.addAll(createShards(null, "a", "b", "c", "d"));
+
+        assertThat(shards.first().getShardId(), is("a"));
+    }
+
+    @Test
+    public void firstShardGetsTheFirstWithAnUnknownParent() throws Exception {
+        ShardList shards = new ShardList();
+        shards.addAll(createShards("a", "b", "c", "d"));
+
+        assertThat(shards.first().getShardId(), is("b"));
+    }
+
+    @Test
+    public void removingShards() throws Exception {
+        ShardList shards = new ShardList();
+        shards.addAll(createShards(null, "a", "b", "c", "d"));
+        Shard removeBefore = new Shard().withShardId("c").withParentShardId("b");
+        shards.removeOlderThan(removeBefore);
+        assertThat(shards.first().getShardId(), is("c"));
+    }
+
+    List<Shard> createShards(String initialParent, String... shardIds) {
+        String previous = initialParent;
+        List<Shard> result = new ArrayList<>();
+        for (String s : shardIds) {
+            result.add(new Shard().withShardId(s).withParentShardId(previous));
+            previous = s;
+        }
+        return result;
+    }
+}
\ No newline at end of file


[2/7] camel git commit: Added basic DynamoDb Stream component.

Posted by da...@apache.org.
Added basic DynamoDb Stream component.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/78fd81e5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/78fd81e5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/78fd81e5

Branch: refs/heads/master
Commit: 78fd81e5f7861e6fbde3fb9d63519bc1e775c93c
Parents: 3da84a6
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 7 09:33:10 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:11 2015 +0100

----------------------------------------------------------------------
 .../aws/ddbstream/DdbStreamComponent.java       |  44 +++++++
 .../aws/ddbstream/DdbStreamConsumer.java        | 128 +++++++++++++++++++
 .../aws/ddbstream/DdbStreamEndpoint.java        | 117 +++++++++++++++++
 .../org/apache/camel/component/aws-ddbstream    |  18 +++
 4 files changed, 307 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java
new file mode 100644
index 0000000..559597b
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import java.util.Map;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DdbStreamComponent extends UriEndpointComponent {
+    private static final Logger LOG = LoggerFactory.getLogger(DdbStreamComponent.class);
+
+    public DdbStreamComponent() {
+        super(DdbStreamEndpoint.class);
+    }
+
+    public DdbStreamComponent(CamelContext context) {
+        super(context, DdbStreamEndpoint.class);
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        DdbStreamEndpoint endpoint = new DdbStreamEndpoint(uri, remaining, this);
+
+        LOG.debug("Created endpoint: {}", endpoint.toString());
+        return endpoint;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
new file mode 100644
index 0000000..88d2ba5
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
+import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
+import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
+import com.amazonaws.services.dynamodbv2.model.Record;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
+    private static final Logger LOG = LoggerFactory.getLogger(DdbStreamConsumer.class);
+
+    private String currentShardIterator = null;
+
+    public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    /*
+     * Returns the number of messages polled.
+     */
+    @Override
+    protected int poll() throws Exception {
+        GetRecordsRequest req = new GetRecordsRequest()
+                .withShardIterator(getShardItertor())
+                .withLimit(getEndpoint().getMaxResultsPerRequest())
+                ;
+        GetRecordsResult result = getClient().getRecords(req);
+
+        Queue<Exchange> exchanges = createExchanges(result.getRecords());
+        int processedExchangeCount = processBatch(CastUtils.cast(exchanges));
+
+        currentShardIterator = result.getNextShardIterator();
+
+        return processedExchangeCount;
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) throws Exception {
+        int processedExchanges = 0;
+        while (!exchanges.isEmpty()) {
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+
+            LOG.trace("Processing exchange [{}] started.", exchange);
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    LOG.trace("Processing exchange [{}] done.", exchange);
+                }
+            });
+            processedExchanges++;
+        }
+        return processedExchanges;
+    }
+
+    private AmazonDynamoDBStreams getClient() {
+        return getEndpoint().getClient();
+    }
+
+    @Override
+    public DdbStreamEndpoint getEndpoint() {
+        return (DdbStreamEndpoint) super.getEndpoint();
+    }
+
+    private String getShardItertor() {
+        // either return a cached one or get a new one via a GetShardIterator request.
+        if (currentShardIterator == null) {
+            ListStreamsRequest req0 = new ListStreamsRequest()
+                    .withTableName(getEndpoint().getTableName())
+                    ;
+            ListStreamsResult res0 = getClient().listStreams(req0);
+            final String streamArn = res0.getStreams().get(0).getStreamArn(); // XXX assumes there is only one stream
+            DescribeStreamRequest req1 = new DescribeStreamRequest()
+                    .withStreamArn(streamArn)
+                    ;
+            DescribeStreamResult res1 = getClient().describeStream(req1);
+
+            GetShardIteratorRequest req = new GetShardIteratorRequest()
+                    .withStreamArn(streamArn)
+                    .withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // XXX only uses the first shard
+                    .withShardIteratorType(getEndpoint().getIteratorType())
+                    ;
+            GetShardIteratorResult result = getClient().getShardIterator(req);
+            currentShardIterator = result.getShardIterator();
+        }
+        LOG.trace("Shard Iterator is: {}", currentShardIterator);
+        return currentShardIterator;
+    }
+
+    private Queue<Exchange> createExchanges(List<Record> records) {
+        Queue<Exchange> exchanges = new ArrayDeque<>();
+        for (Record record : records) {
+            exchanges.add(getEndpoint().createExchange(record));
+        }
+        return exchanges;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
new file mode 100644
index 0000000..18c042d
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.model.Record;
+import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.ScheduledPollEndpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+
+@UriEndpoint(scheme = "aws-ddbstream", title = "AWS Kinesis", syntax = "aws-ddbstream:tableName", consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams")
+public class DdbStreamEndpoint extends ScheduledPollEndpoint {
+
+    @UriPath(label = "consumer,producer", description = "Name of the dynamodb table")
+    @Metadata(required = "true")
+    private String tableName;
+
+    // For now, always assume that we've been supplied a client in the Camel registry.
+    @UriParam(label = "consumer", description = "Amazon DynamoDB client to use for all requests for this endpoint")
+    @Metadata(required = "true")
+    private AmazonDynamoDBStreams amazonDynamoDbStreamsClient;
+
+    @UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll")
+    private int maxResultsPerRequest = 1;
+
+    @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream to start getting records")
+    private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON;
+
+    public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) {
+        super(uri, component);
+        this.tableName = tableName;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        DdbStreamConsumer consumer = new DdbStreamConsumer(this, processor);
+        consumer.setSchedulerProperties(consumer.getEndpoint().getSchedulerProperties());
+        return consumer;
+    }
+
+    Exchange createExchange(Record record) {
+        Exchange ex = super.createExchange();
+        ex.getIn().setBody(record, Record.class);
+
+        return ex;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        // probably right.
+        return true;
+    }
+
+    AmazonDynamoDBStreams getClient() {
+        return amazonDynamoDbStreamsClient;
+    }
+
+    // required for injection.
+    public AmazonDynamoDBStreams getAmazonDynamoDBStreamsClient() {
+        return amazonDynamoDbStreamsClient;
+    }
+
+    public void setAmazonDynamoDbStreamsClient(AmazonDynamoDBStreams amazonDynamoDbStreamsClient) {
+        this.amazonDynamoDbStreamsClient = amazonDynamoDbStreamsClient;
+    }
+
+    public int getMaxResultsPerRequest() {
+        return maxResultsPerRequest;
+    }
+
+    public void setMaxResultsPerRequest(int maxResultsPerRequest) {
+        this.maxResultsPerRequest = maxResultsPerRequest;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public ShardIteratorType getIteratorType() {
+        return iteratorType;
+    }
+
+    public void setIteratorType(ShardIteratorType iteratorType) {
+        this.iteratorType = iteratorType;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream
new file mode 100644
index 0000000..48a8509
--- /dev/null
+++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+class=org.apache.camel.component.aws.ddbstream.DdbStreamComponent


[6/7] camel git commit: Add an understanding of the shard list to the main dynamodbstream consumer.

Posted by da...@apache.org.
Add an understanding of the shard list to the main dynamodbstream consumer.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ceaef105
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ceaef105
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ceaef105

Branch: refs/heads/master
Commit: ceaef105323d031ef92751d2744ad7a0f0be2c08
Parents: fdadd15
Author: Candle <ca...@candle.me.uk>
Authored: Fri Dec 11 16:25:48 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:12 2015 +0100

----------------------------------------------------------------------
 .../component/aws/ddbstream/DdbStreamConsumer.java   | 15 ++++++++++++++-
 .../camel/component/aws/ddbstream/ShardList.java     | 11 +++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ceaef105/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index 88d2ba5..d4c9ac7 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -26,6 +26,7 @@ import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
 import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
 import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
 import com.amazonaws.services.dynamodbv2.model.Record;
+import com.amazonaws.services.dynamodbv2.model.Shard;
 import java.util.ArrayDeque;
 import java.util.List;
 import java.util.Queue;
@@ -42,6 +43,8 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
     private static final Logger LOG = LoggerFactory.getLogger(DdbStreamConsumer.class);
 
     private String currentShardIterator = null;
+    private Shard currentShard;
+    private ShardList shardList = new ShardList();
 
     public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -105,10 +108,20 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
                     .withStreamArn(streamArn)
                     ;
             DescribeStreamResult res1 = getClient().describeStream(req1);
+            shardList.addAll(res1.getStreamDescription().getShards());
+
+            LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
+            if (currentShard == null) {
+                currentShard = shardList.first();
+            } else {
+                currentShard = shardList.nextAfter(currentShard);
+            }
+            shardList.removeOlderThan(currentShard);
+            LOG.trace("Next shard is: {} (in {})", currentShard, shardList);
 
             GetShardIteratorRequest req = new GetShardIteratorRequest()
                     .withStreamArn(streamArn)
-                    .withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // XXX only uses the first shard
+                    .withShardId(currentShard.getShardId())
                     .withShardIteratorType(getEndpoint().getIteratorType())
                     ;
             GetShardIteratorResult result = getClient().getShardIterator(req);

http://git-wip-us.apache.org/repos/asf/camel/blob/ceaef105/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index fb188f4..a85e703 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -4,8 +4,11 @@ import com.amazonaws.services.dynamodbv2.model.Shard;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class ShardList {
+    private final Logger LOG = LoggerFactory.getLogger(ShardList.class);
 
     private final Map<String, Shard> shards = new HashMap<>();
 
@@ -45,13 +48,21 @@ class ShardList {
     void removeOlderThan(Shard removeBefore) {
         String current = removeBefore.getParentShardId();
 
+        int removedShards = 0;
         while (current != null) {
             Shard s = shards.remove(current);
             if (s == null) {
                 current = null;
             } else {
+                removedShards++;
                 current = s.getParentShardId();
             }
         }
+        LOG.trace("removed {} shards from the store, new size is {}", removedShards, shards.size());
+    }
+
+    @Override
+    public String toString() {
+        return "ShardList{" + "shards=" + shards + '}';
     }
 }
\ No newline at end of file


[4/7] camel git commit: Added more documentation.

Posted by da...@apache.org.
Added more documentation.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/71808938
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/71808938
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/71808938

Branch: refs/heads/master
Commit: 7180893861195c2971f046f1a4b403a80c27ec28
Parents: 51e251c
Author: Candle <ca...@candle.me.uk>
Authored: Wed Dec 16 09:05:00 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:12 2015 +0100

----------------------------------------------------------------------
 .../component/aws/ddbstream/DdbStreamEndpoint.java  | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/71808938/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
index dfcc6cc..66a7461 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -29,10 +29,10 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 
-@UriEndpoint(scheme = "aws-ddbstream", title = "AWS Kinesis", syntax = "aws-ddbstream:tableName", consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams")
+@UriEndpoint(scheme = "aws-ddbstream", title = "AWS DynamoDB Streams", consumerOnly = true, syntax = "aws-ddbstream:tableName", consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams")
 public class DdbStreamEndpoint extends ScheduledPollEndpoint {
 
-    @UriPath(label = "consumer,producer", description = "Name of the dynamodb table")
+    @UriPath(label = "consumer", description = "Name of the dynamodb table")
     @Metadata(required = "true")
     private String tableName;
 
@@ -44,8 +44,18 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
     @UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll")
     private int maxResultsPerRequest = 100;
 
-    @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream to start getting records", defaultValue = "LATEST")
+    @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream"
+            + " to start getting records. Note that using TRIM_HORIZON can cause a"
+            + " significant delay before the stream has caught up to real-time."
+            + " Currently only LATEST and TRIM_HORIZON are supported.",
+            defaultValue = "LATEST")
     private ShardIteratorType iteratorType = ShardIteratorType.LATEST;
+    // TODO add the ability to use ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER
+    // by specifying either a sequence number itself or a bean to fetch the
+    // sequence number from persistant storage or somewhere else.
+    // This can be done by having the type of the parameter an interface
+    // and supplying a default implementation and a converter from a long/String
+    // to an instance of this interface.
 
     public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) {
         super(uri, component);


[5/7] camel git commit: Update defaults to more sensible defaults.

Posted by da...@apache.org.
Update defaults to more sensible defaults.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fdadd159
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fdadd159
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fdadd159

Branch: refs/heads/master
Commit: fdadd1594b26c63ca22c67044b2fb9fdadd58c5a
Parents: 8f5136a
Author: Candle <ca...@candle.me.uk>
Authored: Fri Dec 11 16:25:19 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:12 2015 +0100

----------------------------------------------------------------------
 .../camel/component/aws/ddbstream/DdbStreamEndpoint.java       | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/fdadd159/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
index 18c042d..35812ce 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -42,10 +42,10 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
     private AmazonDynamoDBStreams amazonDynamoDbStreamsClient;
 
     @UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll")
-    private int maxResultsPerRequest = 1;
+    private int maxResultsPerRequest = 100;
 
-    @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream to start getting records")
-    private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON;
+    @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream to start getting records", defaultValue = "LATEST")
+    private ShardIteratorType iteratorType = ShardIteratorType.LATEST;
 
     public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) {
         super(uri, component);


[7/7] camel git commit: Correctly support the LATEST shard iterator type by starting with the last shard in the stream descrption.

Posted by da...@apache.org.
Correctly support the LATEST shard iterator type by starting with the last shard in the stream descrption.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e3b86b97
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e3b86b97
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e3b86b97

Branch: refs/heads/master
Commit: e3b86b977bbcda16d35c936dab77ed7b07f90e6e
Parents: 7180893
Author: Candle <ca...@candle.me.uk>
Authored: Wed Dec 16 10:34:21 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:13 2015 +0100

----------------------------------------------------------------------
 .../component/aws/ddbstream/DdbStreamConsumer.java  | 10 +++++++++-
 .../component/aws/ddbstream/DdbStreamEndpoint.java  | 16 +++++++++++++++-
 .../camel/component/aws/ddbstream/ShardList.java    | 13 +++++++++++++
 .../component/aws/ddbstream/ShardListTest.java      |  8 ++++++++
 4 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index 0a6a83c..f5223c0 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -107,7 +107,15 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
 
             LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
             if (currentShard == null) {
-                currentShard = shardList.first();
+                switch(getEndpoint().getIteratorType()) {
+                case TRIM_HORIZON:
+                    currentShard = shardList.first();
+                    break;
+                default:
+                case LATEST:
+                    currentShard = shardList.last();
+                    break;
+                }
             } else {
                 currentShard = shardList.nextAfter(currentShard);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
index 66a7461..543c432 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -29,7 +29,9 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 
-@UriEndpoint(scheme = "aws-ddbstream", title = "AWS DynamoDB Streams", consumerOnly = true, syntax = "aws-ddbstream:tableName", consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams")
+@UriEndpoint(scheme = "aws-ddbstream", title = "AWS DynamoDB Streams",
+        consumerOnly = true, syntax = "aws-ddbstream:tableName",
+        consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams")
 public class DdbStreamEndpoint extends ScheduledPollEndpoint {
 
     @UriPath(label = "consumer", description = "Name of the dynamodb table")
@@ -56,6 +58,8 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
     // This can be done by having the type of the parameter an interface
     // and supplying a default implementation and a converter from a long/String
     // to an instance of this interface.
+    // Note that the shard list needs to have the ability to start at the shard
+    // that includes the supplied sequence number
 
     public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) {
         super(uri, component);
@@ -86,6 +90,16 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
         return true;
     }
 
+    @Override
+    public String toString() {
+        return "DdbStreamEndpoint{"
+                + "tableName=" + tableName
+                + ", amazonDynamoDbStreamsClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest
+                + ", iteratorType="
+                + iteratorType + ", uri=" + getEndpointUri()
+                + '}';
+    }
+
     AmazonDynamoDBStreams getClient() {
         return amazonDynamoDbStreamsClient;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index 6e804f5..a0df179 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -57,6 +57,19 @@ class ShardList {
         throw new IllegalStateException("Unable to find an unparented shard in " + shards);
     }
 
+    Shard last() {
+        Map<String, Shard> shardsByParent = new HashMap<>();
+        for (Shard shard : shards.values()) {
+            shardsByParent.put(shard.getParentShardId(), shard);
+        }
+        for (Shard shard : shards.values()) {
+            if (!shardsByParent.containsKey(shard.getShardId())) {
+                return shard;
+            }
+        }
+        throw new IllegalStateException("Unable to find a shard with no children " + shards);
+    }
+
     /**
      * Removes shards that are older than the provided shard.
      * Does not remove the provided shard.

http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
index 60f3d46..1b7249a 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
@@ -102,6 +102,14 @@ public class ShardListTest {
     }
 
     @Test
+    public void lastShardGetsTheShardWithNoChildren() throws Exception {
+        ShardList shards = new ShardList();
+        shards.addAll(createShards("a", "b", "c", "d"));
+
+        assertThat(shards.last().getShardId(), is("d"));
+    }
+
+    @Test
     public void removingShards() throws Exception {
         ShardList shards = new ShardList();
         shards.addAll(createShards(null, "a", "b", "c", "d"));


[3/7] camel git commit: Fixed CS.

Posted by da...@apache.org.
Fixed CS.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/51e251c7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/51e251c7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/51e251c7

Branch: refs/heads/master
Commit: 51e251c700dfd1bdb0f443d9e42d1281890338dd
Parents: ceaef10
Author: Candle <ca...@candle.me.uk>
Authored: Tue Dec 15 18:22:20 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:12 2015 +0100

----------------------------------------------------------------------
 .../aws/ddbstream/DdbStreamConsumer.java        | 26 ++++++++------------
 .../aws/ddbstream/DdbStreamEndpoint.java        |  2 --
 .../component/aws/ddbstream/ShardList.java      | 23 ++++++++++++++---
 .../component/aws/ddbstream/ShardListTest.java  | 22 +++++++++++++++--
 4 files changed, 50 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/51e251c7/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index d4c9ac7..0a6a83c 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -16,6 +16,10 @@
  */
 package org.apache.camel.component.aws.ddbstream;
 
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+
 import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
 import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
 import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
@@ -27,9 +31,6 @@ import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
 import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
 import com.amazonaws.services.dynamodbv2.model.Record;
 import com.amazonaws.services.dynamodbv2.model.Shard;
-import java.util.ArrayDeque;
-import java.util.List;
-import java.util.Queue;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -42,23 +43,19 @@ import org.slf4j.LoggerFactory;
 public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
     private static final Logger LOG = LoggerFactory.getLogger(DdbStreamConsumer.class);
 
-    private String currentShardIterator = null;
+    private String currentShardIterator;
     private Shard currentShard;
-    private ShardList shardList = new ShardList();
+    private final ShardList shardList = new ShardList();
 
     public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
     }
 
-    /*
-     * Returns the number of messages polled.
-     */
     @Override
     protected int poll() throws Exception {
         GetRecordsRequest req = new GetRecordsRequest()
                 .withShardIterator(getShardItertor())
-                .withLimit(getEndpoint().getMaxResultsPerRequest())
-                ;
+                .withLimit(getEndpoint().getMaxResultsPerRequest());
         GetRecordsResult result = getClient().getRecords(req);
 
         Queue<Exchange> exchanges = createExchanges(result.getRecords());
@@ -100,13 +97,11 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
         // either return a cached one or get a new one via a GetShardIterator request.
         if (currentShardIterator == null) {
             ListStreamsRequest req0 = new ListStreamsRequest()
-                    .withTableName(getEndpoint().getTableName())
-                    ;
+                    .withTableName(getEndpoint().getTableName());
             ListStreamsResult res0 = getClient().listStreams(req0);
             final String streamArn = res0.getStreams().get(0).getStreamArn(); // XXX assumes there is only one stream
             DescribeStreamRequest req1 = new DescribeStreamRequest()
-                    .withStreamArn(streamArn)
-                    ;
+                    .withStreamArn(streamArn);
             DescribeStreamResult res1 = getClient().describeStream(req1);
             shardList.addAll(res1.getStreamDescription().getShards());
 
@@ -122,8 +117,7 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
             GetShardIteratorRequest req = new GetShardIteratorRequest()
                     .withStreamArn(streamArn)
                     .withShardId(currentShard.getShardId())
-                    .withShardIteratorType(getEndpoint().getIteratorType())
-                    ;
+                    .withShardIteratorType(getEndpoint().getIteratorType());
             GetShardIteratorResult result = getClient().getShardIterator(req);
             currentShardIterator = result.getShardIterator();
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/51e251c7/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
index 35812ce..dfcc6cc 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -73,7 +73,6 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
 
     @Override
     public boolean isSingleton() {
-        // probably right.
         return true;
     }
 
@@ -81,7 +80,6 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
         return amazonDynamoDbStreamsClient;
     }
 
-    // required for injection.
     public AmazonDynamoDBStreams getAmazonDynamoDBStreamsClient() {
         return amazonDynamoDbStreamsClient;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/51e251c7/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index a85e703..6e804f5 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -1,14 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.component.aws.ddbstream;
 
-import com.amazonaws.services.dynamodbv2.model.Shard;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+
+import com.amazonaws.services.dynamodbv2.model.Shard;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class ShardList {
-    private final Logger LOG = LoggerFactory.getLogger(ShardList.class);
+    private final Logger log = LoggerFactory.getLogger(ShardList.class);
 
     private final Map<String, Shard> shards = new HashMap<>();
 
@@ -58,7 +75,7 @@ class ShardList {
                 current = s.getParentShardId();
             }
         }
-        LOG.trace("removed {} shards from the store, new size is {}", removedShards, shards.size());
+        log.trace("removed {} shards from the store, new size is {}", removedShards, shards.size());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/51e251c7/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
index 773f4a0..60f3d46 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
@@ -1,11 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.component.aws.ddbstream;
 
-import com.amazonaws.services.dynamodbv2.model.Shard;
+
 import java.util.ArrayList;
 import java.util.List;
+import com.amazonaws.services.dynamodbv2.model.Shard;
+
+import org.junit.Test;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
-import org.junit.Test;
 
 public class ShardListTest {