You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/16 14:22:47 UTC

[2/7] camel git commit: Added basic DynamoDb Stream component.

Added basic DynamoDb Stream component.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/78fd81e5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/78fd81e5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/78fd81e5

Branch: refs/heads/master
Commit: 78fd81e5f7861e6fbde3fb9d63519bc1e775c93c
Parents: 3da84a6
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 7 09:33:10 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:11 2015 +0100

----------------------------------------------------------------------
 .../aws/ddbstream/DdbStreamComponent.java       |  44 +++++++
 .../aws/ddbstream/DdbStreamConsumer.java        | 128 +++++++++++++++++++
 .../aws/ddbstream/DdbStreamEndpoint.java        | 117 +++++++++++++++++
 .../org/apache/camel/component/aws-ddbstream    |  18 +++
 4 files changed, 307 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java
new file mode 100644
index 0000000..559597b
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import java.util.Map;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DdbStreamComponent extends UriEndpointComponent {
+    private static final Logger LOG = LoggerFactory.getLogger(DdbStreamComponent.class);
+
+    public DdbStreamComponent() {
+        super(DdbStreamEndpoint.class);
+    }
+
+    public DdbStreamComponent(CamelContext context) {
+        super(context, DdbStreamEndpoint.class);
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        DdbStreamEndpoint endpoint = new DdbStreamEndpoint(uri, remaining, this);
+
+        LOG.debug("Created endpoint: {}", endpoint.toString());
+        return endpoint;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
new file mode 100644
index 0000000..88d2ba5
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
+import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
+import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
+import com.amazonaws.services.dynamodbv2.model.Record;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
+    private static final Logger LOG = LoggerFactory.getLogger(DdbStreamConsumer.class);
+
+    private String currentShardIterator = null;
+
+    public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    /*
+     * Returns the number of messages polled.
+     */
+    @Override
+    protected int poll() throws Exception {
+        GetRecordsRequest req = new GetRecordsRequest()
+                .withShardIterator(getShardItertor())
+                .withLimit(getEndpoint().getMaxResultsPerRequest())
+                ;
+        GetRecordsResult result = getClient().getRecords(req);
+
+        Queue<Exchange> exchanges = createExchanges(result.getRecords());
+        int processedExchangeCount = processBatch(CastUtils.cast(exchanges));
+
+        currentShardIterator = result.getNextShardIterator();
+
+        return processedExchangeCount;
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) throws Exception {
+        int processedExchanges = 0;
+        while (!exchanges.isEmpty()) {
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+
+            LOG.trace("Processing exchange [{}] started.", exchange);
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    LOG.trace("Processing exchange [{}] done.", exchange);
+                }
+            });
+            processedExchanges++;
+        }
+        return processedExchanges;
+    }
+
+    private AmazonDynamoDBStreams getClient() {
+        return getEndpoint().getClient();
+    }
+
+    @Override
+    public DdbStreamEndpoint getEndpoint() {
+        return (DdbStreamEndpoint) super.getEndpoint();
+    }
+
+    private String getShardItertor() {
+        // either return a cached one or get a new one via a GetShardIterator request.
+        if (currentShardIterator == null) {
+            ListStreamsRequest req0 = new ListStreamsRequest()
+                    .withTableName(getEndpoint().getTableName())
+                    ;
+            ListStreamsResult res0 = getClient().listStreams(req0);
+            final String streamArn = res0.getStreams().get(0).getStreamArn(); // XXX assumes there is only one stream
+            DescribeStreamRequest req1 = new DescribeStreamRequest()
+                    .withStreamArn(streamArn)
+                    ;
+            DescribeStreamResult res1 = getClient().describeStream(req1);
+
+            GetShardIteratorRequest req = new GetShardIteratorRequest()
+                    .withStreamArn(streamArn)
+                    .withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // XXX only uses the first shard
+                    .withShardIteratorType(getEndpoint().getIteratorType())
+                    ;
+            GetShardIteratorResult result = getClient().getShardIterator(req);
+            currentShardIterator = result.getShardIterator();
+        }
+        LOG.trace("Shard Iterator is: {}", currentShardIterator);
+        return currentShardIterator;
+    }
+
+    private Queue<Exchange> createExchanges(List<Record> records) {
+        Queue<Exchange> exchanges = new ArrayDeque<>();
+        for (Record record : records) {
+            exchanges.add(getEndpoint().createExchange(record));
+        }
+        return exchanges;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
new file mode 100644
index 0000000..18c042d
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.model.Record;
+import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.ScheduledPollEndpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+
+@UriEndpoint(scheme = "aws-ddbstream", title = "AWS Kinesis", syntax = "aws-ddbstream:tableName", consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams")
+public class DdbStreamEndpoint extends ScheduledPollEndpoint {
+
+    @UriPath(label = "consumer,producer", description = "Name of the dynamodb table")
+    @Metadata(required = "true")
+    private String tableName;
+
+    // For now, always assume that we've been supplied a client in the Camel registry.
+    @UriParam(label = "consumer", description = "Amazon DynamoDB client to use for all requests for this endpoint")
+    @Metadata(required = "true")
+    private AmazonDynamoDBStreams amazonDynamoDbStreamsClient;
+
+    @UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll")
+    private int maxResultsPerRequest = 1;
+
+    @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream to start getting records")
+    private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON;
+
+    public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) {
+        super(uri, component);
+        this.tableName = tableName;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        DdbStreamConsumer consumer = new DdbStreamConsumer(this, processor);
+        consumer.setSchedulerProperties(consumer.getEndpoint().getSchedulerProperties());
+        return consumer;
+    }
+
+    Exchange createExchange(Record record) {
+        Exchange ex = super.createExchange();
+        ex.getIn().setBody(record, Record.class);
+
+        return ex;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        // probably right.
+        return true;
+    }
+
+    AmazonDynamoDBStreams getClient() {
+        return amazonDynamoDbStreamsClient;
+    }
+
+    // required for injection.
+    public AmazonDynamoDBStreams getAmazonDynamoDBStreamsClient() {
+        return amazonDynamoDbStreamsClient;
+    }
+
+    public void setAmazonDynamoDbStreamsClient(AmazonDynamoDBStreams amazonDynamoDbStreamsClient) {
+        this.amazonDynamoDbStreamsClient = amazonDynamoDbStreamsClient;
+    }
+
+    public int getMaxResultsPerRequest() {
+        return maxResultsPerRequest;
+    }
+
+    public void setMaxResultsPerRequest(int maxResultsPerRequest) {
+        this.maxResultsPerRequest = maxResultsPerRequest;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public ShardIteratorType getIteratorType() {
+        return iteratorType;
+    }
+
+    public void setIteratorType(ShardIteratorType iteratorType) {
+        this.iteratorType = iteratorType;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream
new file mode 100644
index 0000000..48a8509
--- /dev/null
+++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+class=org.apache.camel.component.aws.ddbstream.DdbStreamComponent