You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/16 14:22:47 UTC
[2/7] camel git commit: Added basic DynamoDb Stream component.
Added basic DynamoDb Stream component.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/78fd81e5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/78fd81e5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/78fd81e5
Branch: refs/heads/master
Commit: 78fd81e5f7861e6fbde3fb9d63519bc1e775c93c
Parents: 3da84a6
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 7 09:33:10 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 16 14:19:11 2015 +0100
----------------------------------------------------------------------
.../aws/ddbstream/DdbStreamComponent.java | 44 +++++++
.../aws/ddbstream/DdbStreamConsumer.java | 128 +++++++++++++++++++
.../aws/ddbstream/DdbStreamEndpoint.java | 117 +++++++++++++++++
.../org/apache/camel/component/aws-ddbstream | 18 +++
4 files changed, 307 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java
new file mode 100644
index 0000000..559597b
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import java.util.Map;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DdbStreamComponent extends UriEndpointComponent {
+ private static final Logger LOG = LoggerFactory.getLogger(DdbStreamComponent.class);
+
+ public DdbStreamComponent() {
+ super(DdbStreamEndpoint.class);
+ }
+
+ public DdbStreamComponent(CamelContext context) {
+ super(context, DdbStreamEndpoint.class);
+ }
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ DdbStreamEndpoint endpoint = new DdbStreamEndpoint(uri, remaining, this);
+
+ LOG.debug("Created endpoint: {}", endpoint.toString());
+ return endpoint;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
new file mode 100644
index 0000000..88d2ba5
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
+import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
+import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
+import com.amazonaws.services.dynamodbv2.model.Record;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
+ private static final Logger LOG = LoggerFactory.getLogger(DdbStreamConsumer.class);
+
+ private String currentShardIterator = null;
+
+ public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+ }
+
+ /*
+ * Returns the number of messages polled.
+ */
+ @Override
+ protected int poll() throws Exception {
+ GetRecordsRequest req = new GetRecordsRequest()
+ .withShardIterator(getShardItertor())
+ .withLimit(getEndpoint().getMaxResultsPerRequest())
+ ;
+ GetRecordsResult result = getClient().getRecords(req);
+
+ Queue<Exchange> exchanges = createExchanges(result.getRecords());
+ int processedExchangeCount = processBatch(CastUtils.cast(exchanges));
+
+ currentShardIterator = result.getNextShardIterator();
+
+ return processedExchangeCount;
+ }
+
+ @Override
+ public int processBatch(Queue<Object> exchanges) throws Exception {
+ int processedExchanges = 0;
+ while (!exchanges.isEmpty()) {
+ final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+
+ LOG.trace("Processing exchange [{}] started.", exchange);
+ getAsyncProcessor().process(exchange, new AsyncCallback() {
+ @Override
+ public void done(boolean doneSync) {
+ LOG.trace("Processing exchange [{}] done.", exchange);
+ }
+ });
+ processedExchanges++;
+ }
+ return processedExchanges;
+ }
+
+ private AmazonDynamoDBStreams getClient() {
+ return getEndpoint().getClient();
+ }
+
+ @Override
+ public DdbStreamEndpoint getEndpoint() {
+ return (DdbStreamEndpoint) super.getEndpoint();
+ }
+
+ private String getShardItertor() {
+ // either return a cached one or get a new one via a GetShardIterator request.
+ if (currentShardIterator == null) {
+ ListStreamsRequest req0 = new ListStreamsRequest()
+ .withTableName(getEndpoint().getTableName())
+ ;
+ ListStreamsResult res0 = getClient().listStreams(req0);
+ final String streamArn = res0.getStreams().get(0).getStreamArn(); // XXX assumes there is only one stream
+ DescribeStreamRequest req1 = new DescribeStreamRequest()
+ .withStreamArn(streamArn)
+ ;
+ DescribeStreamResult res1 = getClient().describeStream(req1);
+
+ GetShardIteratorRequest req = new GetShardIteratorRequest()
+ .withStreamArn(streamArn)
+ .withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // XXX only uses the first shard
+ .withShardIteratorType(getEndpoint().getIteratorType())
+ ;
+ GetShardIteratorResult result = getClient().getShardIterator(req);
+ currentShardIterator = result.getShardIterator();
+ }
+ LOG.trace("Shard Iterator is: {}", currentShardIterator);
+ return currentShardIterator;
+ }
+
+ private Queue<Exchange> createExchanges(List<Record> records) {
+ Queue<Exchange> exchanges = new ArrayDeque<>();
+ for (Record record : records) {
+ exchanges.add(getEndpoint().createExchange(record));
+ }
+ return exchanges;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
new file mode 100644
index 0000000..18c042d
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.model.Record;
+import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.ScheduledPollEndpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+
+@UriEndpoint(scheme = "aws-ddbstream", title = "AWS Kinesis", syntax = "aws-ddbstream:tableName", consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams")
+public class DdbStreamEndpoint extends ScheduledPollEndpoint {
+
+ @UriPath(label = "consumer,producer", description = "Name of the dynamodb table")
+ @Metadata(required = "true")
+ private String tableName;
+
+ // For now, always assume that we've been supplied a client in the Camel registry.
+ @UriParam(label = "consumer", description = "Amazon DynamoDB client to use for all requests for this endpoint")
+ @Metadata(required = "true")
+ private AmazonDynamoDBStreams amazonDynamoDbStreamsClient;
+
+ @UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll")
+ private int maxResultsPerRequest = 1;
+
+ @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream to start getting records")
+ private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON;
+
+ public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) {
+ super(uri, component);
+ this.tableName = tableName;
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ DdbStreamConsumer consumer = new DdbStreamConsumer(this, processor);
+ consumer.setSchedulerProperties(consumer.getEndpoint().getSchedulerProperties());
+ return consumer;
+ }
+
+ Exchange createExchange(Record record) {
+ Exchange ex = super.createExchange();
+ ex.getIn().setBody(record, Record.class);
+
+ return ex;
+ }
+
+ @Override
+ public boolean isSingleton() {
+ // probably right.
+ return true;
+ }
+
+ AmazonDynamoDBStreams getClient() {
+ return amazonDynamoDbStreamsClient;
+ }
+
+ // required for injection.
+ public AmazonDynamoDBStreams getAmazonDynamoDBStreamsClient() {
+ return amazonDynamoDbStreamsClient;
+ }
+
+ public void setAmazonDynamoDbStreamsClient(AmazonDynamoDBStreams amazonDynamoDbStreamsClient) {
+ this.amazonDynamoDbStreamsClient = amazonDynamoDbStreamsClient;
+ }
+
+ public int getMaxResultsPerRequest() {
+ return maxResultsPerRequest;
+ }
+
+ public void setMaxResultsPerRequest(int maxResultsPerRequest) {
+ this.maxResultsPerRequest = maxResultsPerRequest;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public ShardIteratorType getIteratorType() {
+ return iteratorType;
+ }
+
+ public void setIteratorType(ShardIteratorType iteratorType) {
+ this.iteratorType = iteratorType;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream
new file mode 100644
index 0000000..48a8509
--- /dev/null
+++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+class=org.apache.camel.component.aws.ddbstream.DdbStreamComponent