You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/02/10 16:06:19 UTC

[camel] 01/01: CAMEL-14519 - Create an AWS-DDB component based on SDK v2

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch CAMEL-14519
in repository https://gitbox.apache.org/repos/asf/camel.git

commit baf8a318aefbae2c6a1e950c5166747d74b48978
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Feb 10 17:01:55 2020 +0100

    CAMEL-14519 - Create an AWS-DDB component based on SDK v2
---
 components/camel-aws2-ddb/pom.xml                  |  86 +++++
 .../aws2/ddb/Ddb2ComponentConfigurer.java          |  33 ++
 .../component/aws2/ddb/Ddb2EndpointConfigurer.java |  52 +++
 .../ddbstream/Ddb2StreamComponentConfigurer.java   |  33 ++
 .../ddbstream/Ddb2StreamEndpointConfigurer.java    |  80 ++++
 .../StringSequenceNumberConverterLoader.java       |  40 ++
 .../services/org/apache/camel/TypeConverterLoader  |   2 +
 .../org/apache/camel/configurer/aws2-ddb-component |   2 +
 .../org/apache/camel/configurer/aws2-ddb-endpoint  |   2 +
 .../camel/configurer/aws2-ddbstream-component      |   2 +
 .../camel/configurer/aws2-ddbstream-endpoint       |   2 +
 .../services/org/apache/camel/other.properties     |   7 +
 .../src/generated/resources/aws2-ddb.json          |  12 +
 .../apache/camel/component/aws2/ddb/aws2-ddb.json  |  48 +++
 .../component/aws2/ddbstream/aws2-ddbstream.json   |  63 ++++
 .../src/main/docs/aws2-ddb-component.adoc          | 407 +++++++++++++++++++++
 .../src/main/docs/aws2-ddbstream-component.adoc    | 252 +++++++++++++
 .../component/aws2/ddb/AbstractDdbCommand.java     | 100 +++++
 .../component/aws2/ddb/BatchGetItemsCommand.java   |  51 +++
 .../camel/component/aws2/ddb/Ddb2Component.java    | 123 +++++++
 .../aws2/ddb/Ddb2ComponentVerifierExtension.java   |  99 +++++
 .../component/aws2/ddb/Ddb2Configuration.java      | 226 ++++++++++++
 .../camel/component/aws2/ddb/Ddb2Constants.java    |  66 ++++
 .../camel/component/aws2/ddb/Ddb2Endpoint.java     | 200 ++++++++++
 .../camel/component/aws2/ddb/Ddb2Operations.java   |  30 ++
 .../camel/component/aws2/ddb/Ddb2Producer.java     |  95 +++++
 .../component/aws2/ddb/DeleteItemCommand.java      |  41 +++
 .../component/aws2/ddb/DeleteTableCommand.java     |  50 +++
 .../component/aws2/ddb/DescribeTableCommand.java   |  50 +++
 .../camel/component/aws2/ddb/GetItemCommand.java   |  40 ++
 .../camel/component/aws2/ddb/PutItemCommand.java   |  41 +++
 .../camel/component/aws2/ddb/QueryCommand.java     |  73 ++++
 .../camel/component/aws2/ddb/ScanCommand.java      |  56 +++
 .../component/aws2/ddb/UpdateItemCommand.java      |  51 +++
 .../component/aws2/ddb/UpdateTableCommand.java     |  50 +++
 .../aws2/ddbstream/BigIntComparisons.java          |  44 +++
 .../aws2/ddbstream/Ddb2StreamComponent.java        | 122 ++++++
 .../Ddb2StreamComponentVerifierExtension.java      |  87 +++++
 .../aws2/ddbstream/Ddb2StreamConfiguration.java    | 167 +++++++++
 .../aws2/ddbstream/Ddb2StreamConsumer.java         | 137 +++++++
 .../aws2/ddbstream/Ddb2StreamEndpoint.java         | 163 +++++++++
 .../aws2/ddbstream/SequenceNumberProvider.java     |  21 ++
 .../aws2/ddbstream/ShardIteratorHandler.java       | 142 +++++++
 .../camel/component/aws2/ddbstream/ShardList.java  | 148 ++++++++
 .../ddbstream/StaticSequenceNumberProvider.java    |  31 ++
 .../ddbstream/StringSequenceNumberConverter.java   |  32 ++
 .../component/aws2/ddb/AmazonDDBClientMock.java    | 225 ++++++++++++
 .../aws2/ddb/BatchGetItemsCommandTest.java         |  78 ++++
 .../aws2/ddb/DdbComponentConfigurationTest.java    |  70 ++++
 .../aws2/ddb/DdbComponentRegistryClientTest.java   |  43 +++
 .../camel/component/aws2/ddb/DdbComponentTest.java |  73 ++++
 .../ddb/DdbComponentVerifierExtensionTest.java     |  71 ++++
 .../component/aws2/ddb/DeleteItemCommandTest.java  |  76 ++++
 .../component/aws2/ddb/DeleteTableCommandTest.java |  65 ++++
 .../aws2/ddb/DescribeTableCommandTest.java         |  69 ++++
 .../component/aws2/ddb/GetItemCommandTest.java     |  72 ++++
 .../component/aws2/ddb/PutItemCommandTest.java     |  71 ++++
 .../camel/component/aws2/ddb/QueryCommandTest.java |  91 +++++
 .../camel/component/aws2/ddb/ScanCommandTest.java  |  79 ++++
 .../component/aws2/ddb/UpdateItemCommandTest.java  |  84 +++++
 .../component/aws2/ddb/UpdateTableCommandTest.java |  56 +++
 .../integration/DdbComponentIntegrationTest.java   | 150 ++++++++
 .../DdbStreamComponentConfigurationTest.java       |  80 ++++
 .../DdbStreamComponentVerifierExtensionTest.java   |  71 ++++
 .../src/test/resources/log4j2.properties           |  28 ++
 65 files changed, 5211 insertions(+)

diff --git a/components/camel-aws2-ddb/pom.xml b/components/camel-aws2-ddb/pom.xml
new file mode 100644
index 0000000..1b5a4c8
--- /dev/null
+++ b/components/camel-aws2-ddb/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>components</artifactId>
+        <version>3.1.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>camel-aws2-ddb</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Camel :: AWS2 DDB</name>
+    <description>A Camel Amazon DynamoDB Web Service Component Version 2</description>
+
+    <properties>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-support</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>dynamodb</artifactId>
+            <version>${aws-java-sdk2-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>apache-client</artifactId>
+            <version>${aws-java-sdk2-version}</version>
+        </dependency>
+
+        <!-- for testing -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-spring</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/components/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddb/Ddb2ComponentConfigurer.java b/components/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddb/Ddb2ComponentConfigurer.java
new file mode 100644
index 0000000..bf278bd
--- /dev/null
+++ b/components/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddb/Ddb2ComponentConfigurer.java
@@ -0,0 +1,33 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.aws2.ddb;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.GeneratedPropertyConfigurer;
+import org.apache.camel.support.component.PropertyConfigurerSupport;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@SuppressWarnings("unchecked")
+public class Ddb2ComponentConfigurer extends PropertyConfigurerSupport implements GeneratedPropertyConfigurer {
+
+    @Override
+    public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) {
+        Ddb2Component target = (Ddb2Component) obj;
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "accesskey":
+        case "accessKey": target.setAccessKey(property(camelContext, java.lang.String.class, value)); return true;
+        case "lazystartproducer":
+        case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
+        case "region": target.setRegion(property(camelContext, java.lang.String.class, value)); return true;
+        case "secretkey":
+        case "secretKey": target.setSecretKey(property(camelContext, java.lang.String.class, value)); return true;
+        case "basicpropertybinding":
+        case "basicPropertyBinding": target.setBasicPropertyBinding(property(camelContext, boolean.class, value)); return true;
+        case "configuration": target.setConfiguration(property(camelContext, org.apache.camel.component.aws2.ddb.Ddb2Configuration.class, value)); return true;
+        default: return false;
+        }
+    }
+
+}
+
diff --git a/components/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddb/Ddb2EndpointConfigurer.java b/components/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddb/Ddb2EndpointConfigurer.java
new file mode 100644
index 0000000..63cea29
--- /dev/null
+++ b/components/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddb/Ddb2EndpointConfigurer.java
@@ -0,0 +1,52 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.aws2.ddb;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.GeneratedPropertyConfigurer;
+import org.apache.camel.support.component.PropertyConfigurerSupport;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@SuppressWarnings("unchecked")
+public class Ddb2EndpointConfigurer extends PropertyConfigurerSupport implements GeneratedPropertyConfigurer {
+
+    @Override
+    public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) {
+        Ddb2Endpoint target = (Ddb2Endpoint) obj;
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "amazonddbclient":
+        case "amazonDDBClient": target.getConfiguration().setAmazonDDBClient(property(camelContext, software.amazon.awssdk.services.dynamodb.DynamoDbClient.class, value)); return true;
+        case "consistentread":
+        case "consistentRead": target.getConfiguration().setConsistentRead(property(camelContext, boolean.class, value)); return true;
+        case "keyattributename":
+        case "keyAttributeName": target.getConfiguration().setKeyAttributeName(property(camelContext, java.lang.String.class, value)); return true;
+        case "keyattributetype":
+        case "keyAttributeType": target.getConfiguration().setKeyAttributeType(property(camelContext, java.lang.String.class, value)); return true;
+        case "lazystartproducer":
+        case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
+        case "operation": target.getConfiguration().setOperation(property(camelContext, org.apache.camel.component.aws2.ddb.Ddb2Operations.class, value)); return true;
+        case "proxyhost":
+        case "proxyHost": target.getConfiguration().setProxyHost(property(camelContext, java.lang.String.class, value)); return true;
+        case "proxyport":
+        case "proxyPort": target.getConfiguration().setProxyPort(property(camelContext, java.lang.Integer.class, value)); return true;
+        case "proxyprotocol":
+        case "proxyProtocol": target.getConfiguration().setProxyProtocol(property(camelContext, software.amazon.awssdk.core.Protocol.class, value)); return true;
+        case "readcapacity":
+        case "readCapacity": target.getConfiguration().setReadCapacity(property(camelContext, java.lang.Long.class, value)); return true;
+        case "region": target.getConfiguration().setRegion(property(camelContext, java.lang.String.class, value)); return true;
+        case "writecapacity":
+        case "writeCapacity": target.getConfiguration().setWriteCapacity(property(camelContext, java.lang.Long.class, value)); return true;
+        case "basicpropertybinding":
+        case "basicPropertyBinding": target.setBasicPropertyBinding(property(camelContext, boolean.class, value)); return true;
+        case "synchronous": target.setSynchronous(property(camelContext, boolean.class, value)); return true;
+        case "accesskey":
+        case "accessKey": target.getConfiguration().setAccessKey(property(camelContext, java.lang.String.class, value)); return true;
+        case "secretkey":
+        case "secretKey": target.getConfiguration().setSecretKey(property(camelContext, java.lang.String.class, value)); return true;
+        default: return false;
+        }
+    }
+
+}
+
diff --git a/components/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponentConfigurer.java b/components/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponentConfigurer.java
new file mode 100644
index 0000000..3e6e657
--- /dev/null
+++ b/components/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponentConfigurer.java
@@ -0,0 +1,33 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.aws2.ddbstream;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.GeneratedPropertyConfigurer;
+import org.apache.camel.support.component.PropertyConfigurerSupport;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@SuppressWarnings("unchecked")
+public class Ddb2StreamComponentConfigurer extends PropertyConfigurerSupport implements GeneratedPropertyConfigurer {
+
+    @Override
+    public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) {
+        Ddb2StreamComponent target = (Ddb2StreamComponent) obj;
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "accesskey":
+        case "accessKey": target.setAccessKey(property(camelContext, java.lang.String.class, value)); return true;
+        case "bridgeerrorhandler":
+        case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
+        case "region": target.setRegion(property(camelContext, java.lang.String.class, value)); return true;
+        case "secretkey":
+        case "secretKey": target.setSecretKey(property(camelContext, java.lang.String.class, value)); return true;
+        case "basicpropertybinding":
+        case "basicPropertyBinding": target.setBasicPropertyBinding(property(camelContext, boolean.class, value)); return true;
+        case "configuration": target.setConfiguration(property(camelContext, org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.class, value)); return true;
+        default: return false;
+        }
+    }
+
+}
+
diff --git a/components/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointConfigurer.java b/components/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointConfigurer.java
new file mode 100644
index 0000000..69ef617
--- /dev/null
+++ b/components/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointConfigurer.java
@@ -0,0 +1,80 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.aws2.ddbstream;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.GeneratedPropertyConfigurer;
+import org.apache.camel.support.component.PropertyConfigurerSupport;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@SuppressWarnings("unchecked")
+public class Ddb2StreamEndpointConfigurer extends PropertyConfigurerSupport implements GeneratedPropertyConfigurer {
+
+    @Override
+    public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) {
+        Ddb2StreamEndpoint target = (Ddb2StreamEndpoint) obj;
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "amazondynamodbstreamsclient":
+        case "amazonDynamoDbStreamsClient": target.getConfiguration().setAmazonDynamoDbStreamsClient(property(camelContext, software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient.class, value)); return true;
+        case "bridgeerrorhandler":
+        case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
+        case "iteratortype":
+        case "iteratorType": target.getConfiguration().setIteratorType(property(camelContext, software.amazon.awssdk.services.dynamodb.model.ShardIteratorType.class, value)); return true;
+        case "maxresultsperrequest":
+        case "maxResultsPerRequest": target.getConfiguration().setMaxResultsPerRequest(property(camelContext, int.class, value)); return true;
+        case "proxyhost":
+        case "proxyHost": target.getConfiguration().setProxyHost(property(camelContext, java.lang.String.class, value)); return true;
+        case "proxyport":
+        case "proxyPort": target.getConfiguration().setProxyPort(property(camelContext, java.lang.Integer.class, value)); return true;
+        case "proxyprotocol":
+        case "proxyProtocol": target.getConfiguration().setProxyProtocol(property(camelContext, software.amazon.awssdk.core.Protocol.class, value)); return true;
+        case "region": target.getConfiguration().setRegion(property(camelContext, java.lang.String.class, value)); return true;
+        case "sendemptymessagewhenidle":
+        case "sendEmptyMessageWhenIdle": target.setSendEmptyMessageWhenIdle(property(camelContext, boolean.class, value)); return true;
+        case "sequencenumberprovider":
+        case "sequenceNumberProvider": target.getConfiguration().setSequenceNumberProvider(property(camelContext, org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider.class, value)); return true;
+        case "exceptionhandler":
+        case "exceptionHandler": target.setExceptionHandler(property(camelContext, org.apache.camel.spi.ExceptionHandler.class, value)); return true;
+        case "exchangepattern":
+        case "exchangePattern": target.setExchangePattern(property(camelContext, org.apache.camel.ExchangePattern.class, value)); return true;
+        case "pollstrategy":
+        case "pollStrategy": target.setPollStrategy(property(camelContext, org.apache.camel.spi.PollingConsumerPollStrategy.class, value)); return true;
+        case "basicpropertybinding":
+        case "basicPropertyBinding": target.setBasicPropertyBinding(property(camelContext, boolean.class, value)); return true;
+        case "synchronous": target.setSynchronous(property(camelContext, boolean.class, value)); return true;
+        case "backofferrorthreshold":
+        case "backoffErrorThreshold": target.setBackoffErrorThreshold(property(camelContext, int.class, value)); return true;
+        case "backoffidlethreshold":
+        case "backoffIdleThreshold": target.setBackoffIdleThreshold(property(camelContext, int.class, value)); return true;
+        case "backoffmultiplier":
+        case "backoffMultiplier": target.setBackoffMultiplier(property(camelContext, int.class, value)); return true;
+        case "delay": target.setDelay(property(camelContext, long.class, value)); return true;
+        case "greedy": target.setGreedy(property(camelContext, boolean.class, value)); return true;
+        case "initialdelay":
+        case "initialDelay": target.setInitialDelay(property(camelContext, long.class, value)); return true;
+        case "repeatcount":
+        case "repeatCount": target.setRepeatCount(property(camelContext, long.class, value)); return true;
+        case "runlogginglevel":
+        case "runLoggingLevel": target.setRunLoggingLevel(property(camelContext, org.apache.camel.LoggingLevel.class, value)); return true;
+        case "scheduledexecutorservice":
+        case "scheduledExecutorService": target.setScheduledExecutorService(property(camelContext, java.util.concurrent.ScheduledExecutorService.class, value)); return true;
+        case "scheduler": target.setScheduler(property(camelContext, java.lang.String.class, value)); return true;
+        case "schedulerproperties":
+        case "schedulerProperties": target.setSchedulerProperties(property(camelContext, java.util.Map.class, value)); return true;
+        case "startscheduler":
+        case "startScheduler": target.setStartScheduler(property(camelContext, boolean.class, value)); return true;
+        case "timeunit":
+        case "timeUnit": target.setTimeUnit(property(camelContext, java.util.concurrent.TimeUnit.class, value)); return true;
+        case "usefixeddelay":
+        case "useFixedDelay": target.setUseFixedDelay(property(camelContext, boolean.class, value)); return true;
+        case "accesskey":
+        case "accessKey": target.getConfiguration().setAccessKey(property(camelContext, java.lang.String.class, value)); return true;
+        case "secretkey":
+        case "secretKey": target.getConfiguration().setSecretKey(property(camelContext, java.lang.String.class, value)); return true;
+        default: return false;
+        }
+    }
+
+}
+
diff --git a/components/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverterLoader.java b/components/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverterLoader.java
new file mode 100644
index 0000000..7590721
--- /dev/null
+++ b/components/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverterLoader.java
@@ -0,0 +1,40 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.aws2.ddbstream;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.TypeConversionException;
+import org.apache.camel.TypeConverterLoaderException;
+import org.apache.camel.spi.TypeConverterLoader;
+import org.apache.camel.spi.TypeConverterRegistry;
+import org.apache.camel.support.SimpleTypeConverter;
+import org.apache.camel.support.TypeConverterSupport;
+import org.apache.camel.util.DoubleMap;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@SuppressWarnings("unchecked")
+public final class StringSequenceNumberConverterLoader implements TypeConverterLoader {
+
+    public StringSequenceNumberConverterLoader() {
+    }
+
+    @Override
+    public void load(TypeConverterRegistry registry) throws TypeConverterLoaderException {
+        try {
+            registerConverters(registry);
+        } catch (Throwable e) {
+            // ignore on load error
+        }
+    }
+
+    private void registerConverters(TypeConverterRegistry registry) {
+        addTypeConverter(registry, org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider.class, java.lang.String.class, false,
+            (type, exchange, value) -> org.apache.camel.component.aws2.ddbstream.StringSequenceNumberConverter.toSequenceNumberProvider((java.lang.String) value));
+    }
+
+    private static void addTypeConverter(TypeConverterRegistry registry, Class<?> toType, Class<?> fromType, boolean allowNull, SimpleTypeConverter.ConversionMethod method) { 
+        registry.addTypeConverter(toType, fromType, new SimpleTypeConverter(allowNull, method));
+    }
+
+}
diff --git a/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/TypeConverterLoader b/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/TypeConverterLoader
new file mode 100644
index 0000000..d5cbd3a
--- /dev/null
+++ b/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/TypeConverterLoader
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+org.apache.camel.component.aws2.ddbstream.StringSequenceNumberConverterLoader
diff --git a/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/configurer/aws2-ddb-component b/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/configurer/aws2-ddb-component
new file mode 100644
index 0000000..1931813
--- /dev/null
+++ b/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/configurer/aws2-ddb-component
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.aws2.ddb.Ddb2ComponentConfigurer
diff --git a/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/configurer/aws2-ddb-endpoint b/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/configurer/aws2-ddb-endpoint
new file mode 100644
index 0000000..69b5b38
--- /dev/null
+++ b/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/configurer/aws2-ddb-endpoint
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.aws2.ddb.Ddb2EndpointConfigurer
diff --git a/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/configurer/aws2-ddbstream-component b/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/configurer/aws2-ddbstream-component
new file mode 100644
index 0000000..d69a890
--- /dev/null
+++ b/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/configurer/aws2-ddbstream-component
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.aws2.ddbstream.Ddb2StreamComponentConfigurer
diff --git a/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/configurer/aws2-ddbstream-endpoint b/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/configurer/aws2-ddbstream-endpoint
new file mode 100644
index 0000000..7a371a3
--- /dev/null
+++ b/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/configurer/aws2-ddbstream-endpoint
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.aws2.ddbstream.Ddb2StreamEndpointConfigurer
diff --git a/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/other.properties b/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/other.properties
new file mode 100644
index 0000000..6ce1e1a
--- /dev/null
+++ b/components/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/other.properties
@@ -0,0 +1,7 @@
+# Generated by camel build tools - do NOT edit this file!
+name=aws2-ddb
+groupId=org.apache.camel
+artifactId=camel-aws2-ddb
+version=3.1.0-SNAPSHOT
+projectName=Camel :: AWS2 DDB
+projectDescription=A Camel Amazon DynamoDB Web Service Component Version 2
diff --git a/components/camel-aws2-ddb/src/generated/resources/aws2-ddb.json b/components/camel-aws2-ddb/src/generated/resources/aws2-ddb.json
new file mode 100644
index 0000000..b9be952
--- /dev/null
+++ b/components/camel-aws2-ddb/src/generated/resources/aws2-ddb.json
@@ -0,0 +1,12 @@
+{
+  "other": {
+    "kind": "other",
+    "name": "aws2-ddb",
+    "title": "Aws2 Ddb",
+    "description": "A Camel Amazon DynamoDB Web Service Component Version 2",
+    "deprecated": false,
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-aws2-ddb",
+    "version": "3.1.0-SNAPSHOT"
+  }
+}
diff --git a/components/camel-aws2-ddb/src/generated/resources/org/apache/camel/component/aws2/ddb/aws2-ddb.json b/components/camel-aws2-ddb/src/generated/resources/org/apache/camel/component/aws2/ddb/aws2-ddb.json
new file mode 100644
index 0000000..0133e81
--- /dev/null
+++ b/components/camel-aws2-ddb/src/generated/resources/org/apache/camel/component/aws2/ddb/aws2-ddb.json
@@ -0,0 +1,48 @@
+{
+  "component": {
+    "kind": "component",
+    "scheme": "aws2-ddb",
+    "extendsScheme": "",
+    "syntax": "aws2-ddb:tableName",
+    "title": "AWS 2 DynamoDB",
+    "description": "The aws-ddb component is used for storing and retrieving data from Amazon's DynamoDB service.",
+    "label": "cloud,database,nosql",
+    "deprecated": false,
+    "async": false,
+    "consumerOnly": false,
+    "producerOnly": true,
+    "lenientProperties": false,
+    "javaType": "org.apache.camel.component.aws2.ddb.Ddb2Component",
+    "firstVersion": "3.1.0",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-aws2-ddb",
+    "version": "3.1.0-SNAPSHOT"
+  },
+  "componentProperties": {
+    "accessKey": { "kind": "property", "displayName": "Access Key", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Amazon AWS Access Key" },
+    "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the r [...]
+    "region": { "kind": "property", "displayName": "Region", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "The region in which DDB client needs to work" },
+    "secretKey": { "kind": "property", "displayName": "Secret Key", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Amazon AWS Secret Key" },
+    "basicPropertyBinding": { "kind": "property", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" },
+    "configuration": { "kind": "property", "displayName": "Configuration", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.ddb.Ddb2Configuration", "deprecated": false, "secret": false, "description": "The AWS DDB default configuration" }
+  },
+  "properties": {
+    "tableName": { "kind": "path", "displayName": "Table Name", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddb.Ddb2Configuration", "configurationField": "configuration", "description": "The name of the table currently worked with." },
+    "amazonDDBClient": { "kind": "parameter", "displayName": "Amazon DDBClient", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "software.amazon.awssdk.services.dynamodb.DynamoDbClient", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddb.Ddb2Configuration", "configurationField": "configuration", "description": "To use the AmazonDynamoDB as the client" },
+    "consistentRead": { "kind": "parameter", "displayName": "Consistent Read", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.ddb.Ddb2Configuration", "configurationField": "configuration", "description": "Determines whether or not strong consistency should be enforced when data is read." },
+    "keyAttributeName": { "kind": "parameter", "displayName": "Key Attribute Name", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddb.Ddb2Configuration", "configurationField": "configuration", "description": "Attribute name when creating table" },
+    "keyAttributeType": { "kind": "parameter", "displayName": "Key Attribute Type", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddb.Ddb2Configuration", "configurationField": "configuration", "description": "Attribute type when creating table" },
+    "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the  [...]
+    "operation": { "kind": "parameter", "displayName": "Operation", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.ddb.Ddb2Operations", "enum": [ "BatchGetItems", "DeleteItem", "DeleteTable", "DescribeTable", "GetItem", "PutItem", "Query", "Scan", "UpdateItem", "UpdateTable" ], "deprecated": false, "secret": false, "defaultValue": "PutItem", "configurationClass": "org.apache.camel.component.aws2.ddb.Ddb2Configuration",  [...]
+    "proxyHost": { "kind": "parameter", "displayName": "Proxy Host", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddb.Ddb2Configuration", "configurationField": "configuration", "description": "To define a proxy host when instantiating the DDB client" },
+    "proxyPort": { "kind": "parameter", "displayName": "Proxy Port", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddb.Ddb2Configuration", "configurationField": "configuration", "description": "To define a proxy port when instantiating the DDB client. When using this parameter, the configuration will expect the capitalized name of the reg [...]
+    "proxyProtocol": { "kind": "parameter", "displayName": "Proxy Protocol", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "software.amazon.awssdk.core.Protocol", "enum": [ "HTTP", "HTTPS" ], "deprecated": false, "secret": false, "defaultValue": "HTTPS", "configurationClass": "org.apache.camel.component.aws2.ddb.Ddb2Configuration", "configurationField": "configuration", "description": "To define a proxy protocol when instantiating the DDB client" },
+    "readCapacity": { "kind": "parameter", "displayName": "Read Capacity", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Long", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddb.Ddb2Configuration", "configurationField": "configuration", "description": "The provisioned throughput to reserve for reading resources from your table" },
+    "region": { "kind": "parameter", "displayName": "Region", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddb.Ddb2Configuration", "configurationField": "configuration", "description": "The region in which DDB client needs to work" },
+    "writeCapacity": { "kind": "parameter", "displayName": "Write Capacity", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Long", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddb.Ddb2Configuration", "configurationField": "configuration", "description": "The provisioned throughput to reserved for writing resources to your table" },
+    "basicPropertyBinding": { "kind": "parameter", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" },
+    "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "description": "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported)." },
+    "accessKey": { "kind": "parameter", "displayName": "Access Key", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": true, "configurationClass": "org.apache.camel.component.aws2.ddb.Ddb2Configuration", "configurationField": "configuration", "description": "Amazon AWS Access Key" },
+    "secretKey": { "kind": "parameter", "displayName": "Secret Key", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": true, "configurationClass": "org.apache.camel.component.aws2.ddb.Ddb2Configuration", "configurationField": "configuration", "description": "Amazon AWS Secret Key" }
+  }
+}
diff --git a/components/camel-aws2-ddb/src/generated/resources/org/apache/camel/component/aws2/ddbstream/aws2-ddbstream.json b/components/camel-aws2-ddb/src/generated/resources/org/apache/camel/component/aws2/ddbstream/aws2-ddbstream.json
new file mode 100644
index 0000000..f25950d
--- /dev/null
+++ b/components/camel-aws2-ddb/src/generated/resources/org/apache/camel/component/aws2/ddbstream/aws2-ddbstream.json
@@ -0,0 +1,63 @@
+{
+  "component": {
+    "kind": "component",
+    "scheme": "aws2-ddbstream",
+    "extendsScheme": "",
+    "syntax": "aws2-ddbstream:tableName",
+    "title": "AWS 2 DynamoDB Streams",
+    "description": "The aws-ddbstream component is used for working with Amazon DynamoDB Streams.",
+    "label": "cloud,messaging,streams",
+    "deprecated": false,
+    "async": false,
+    "consumerOnly": true,
+    "producerOnly": false,
+    "lenientProperties": false,
+    "javaType": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamComponent",
+    "firstVersion": "3.1.0",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-aws2-ddb",
+    "version": "3.1.0-SNAPSHOT"
+  },
+  "componentProperties": {
+    "accessKey": { "kind": "property", "displayName": "Access Key", "group": "consumer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Amazon AWS Access Key" },
+    "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by [...]
+    "region": { "kind": "property", "displayName": "Region", "group": "consumer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Amazon AWS Region" },
+    "secretKey": { "kind": "property", "displayName": "Secret Key", "group": "consumer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Amazon AWS Secret Key" },
+    "basicPropertyBinding": { "kind": "property", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" },
+    "configuration": { "kind": "property", "displayName": "Configuration", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "deprecated": false, "secret": false, "description": "The AWS DDB stream default configuration" }
+  },
+  "properties": {
+    "tableName": { "kind": "path", "displayName": "Table Name", "group": "consumer", "label": "consumer", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Name of the dynamodb table" },
+    "amazonDynamoDbStreamsClient": { "kind": "parameter", "displayName": "Amazon Dynamo Db Streams Client", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Amazon DynamoDB client to use for a [...]
+    "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled b [...]
+    "iteratorType": { "kind": "parameter", "displayName": "Iterator Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "software.amazon.awssdk.services.dynamodb.model.ShardIteratorType", "enum": [ "TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "null" ], "deprecated": false, "secret": false, "defaultValue": "LATEST", "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configura [...]
+    "maxResultsPerRequest": { "kind": "parameter", "displayName": "Max Results Per Request", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Maximum number of records that will be fetched in each poll" },
+    "proxyHost": { "kind": "parameter", "displayName": "Proxy Host", "group": "consumer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "To define a proxy host when instantiating the DDBStreams client" },
+    "proxyPort": { "kind": "parameter", "displayName": "Proxy Port", "group": "consumer", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "To define a proxy port when instantiating the DDBStreams client" },
+    "proxyProtocol": { "kind": "parameter", "displayName": "Proxy Protocol", "group": "consumer", "label": "", "required": false, "type": "object", "javaType": "software.amazon.awssdk.core.Protocol", "enum": [ "HTTP", "HTTPS" ], "deprecated": false, "secret": false, "defaultValue": "HTTPS", "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "To define a proxy protocol when instantiating the DDBS [...]
+    "region": { "kind": "parameter", "displayName": "Region", "group": "consumer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "The region in which DDBStreams client needs to work" },
+    "sendEmptyMessageWhenIdle": { "kind": "parameter", "displayName": "Send Empty Message When Idle", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead." },
+    "sequenceNumberProvider": { "kind": "parameter", "displayName": "Sequence Number Provider", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Provider for the sequence number when using one of th [...]
+    "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with [...]
+    "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
+    "pollStrategy": { "kind": "parameter", "displayName": "Poll Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.PollingConsumerPollStrategy", "deprecated": false, "secret": false, "description": "A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange h [...]
+    "basicPropertyBinding": { "kind": "parameter", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" },
+    "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "description": "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported)." },
+    "backoffErrorThreshold": { "kind": "parameter", "displayName": "Backoff Error Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "description": "The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in." },
+    "backoffIdleThreshold": { "kind": "parameter", "displayName": "Backoff Idle Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "description": "The number of subsequent idle polls that should happen before the backoffMultipler should kick-in." },
+    "backoffMultiplier": { "kind": "parameter", "displayName": "Backoff Multiplier", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "description": "To let the scheduled polling consumer backoff if there has been a number of subsequent idles\/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option  [...]
+    "delay": { "kind": "parameter", "displayName": "Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "500", "description": "Milliseconds before the next poll. You can also specify time values using units, such as 60s (60 seconds), 5m30s (5 minutes and 30 seconds), and 1h (1 hour)." },
+    "greedy": { "kind": "parameter", "displayName": "Greedy", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages." },
+    "initialDelay": { "kind": "parameter", "displayName": "Initial Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "1000", "description": "Milliseconds before the first poll starts. You can also specify time values using units, such as 60s (60 seconds), 5m30s (5 minutes and 30 seconds), and 1h (1 hour)." },
+    "repeatCount": { "kind": "parameter", "displayName": "Repeat Count", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "0", "description": "Specifies a maximum limit of number of fires. So if you set it to 1, the scheduler will only fire once. If you set it to 5, it will only fire five times. A value of zero or negative means fire forever." },
+    "runLoggingLevel": { "kind": "parameter", "displayName": "Run Logging Level", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "secret": false, "defaultValue": "TRACE", "description": "The consumer logs a start\/complete log line when it polls. This option allows you to configure the logging level for that." },
+    "scheduledExecutorService": { "kind": "parameter", "displayName": "Scheduled Executor Service", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.util.concurrent.ScheduledExecutorService", "deprecated": false, "secret": false, "description": "Allows for configuring a custom\/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool." },
+    "scheduler": { "kind": "parameter", "displayName": "Scheduler", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "spring", "quartz" ], "deprecated": false, "secret": false, "defaultValue": "none", "description": "To use a cron scheduler from either camel-spring or camel-quartz component" },
+    "schedulerProperties": { "kind": "parameter", "displayName": "Scheduler Properties", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "scheduler.", "multiValue": true, "deprecated": false, "secret": false, "description": "To configure additional properties when using a custom scheduler or any of the Quartz, Spring based scheduler." },
+    "startScheduler": { "kind": "parameter", "displayName": "Start Scheduler", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "description": "Whether the scheduler should be auto started." },
+    "timeUnit": { "kind": "parameter", "displayName": "Time Unit", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS", "MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ], "deprecated": false, "secret": false, "defaultValue": "MILLISECONDS", "description": "Time unit for initialDelay and delay options." },
+    "useFixedDelay": { "kind": "parameter", "displayName": "Use Fixed Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "description": "Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details." },
+    "accessKey": { "kind": "parameter", "displayName": "Access Key", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": true, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Amazon AWS Access Key" },
+    "secretKey": { "kind": "parameter", "displayName": "Secret Key", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": true, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Amazon AWS Secret Key" }
+  }
+}
diff --git a/components/camel-aws2-ddb/src/main/docs/aws2-ddb-component.adoc b/components/camel-aws2-ddb/src/main/docs/aws2-ddb-component.adoc
new file mode 100644
index 0000000..f93f0c7
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/docs/aws2-ddb-component.adoc
@@ -0,0 +1,407 @@
+[[aws-ddb-component]]
+= AWS DynamoDB Component
+
+*Since Camel 2.10*
+
+// HEADER START
+*Only producer is supported*
+// HEADER END
+
+The DynamoDB component supports storing and retrieving data from/to
+https://aws.amazon.com/dynamodb[Amazon's DynamoDB] service.
+
+Prerequisites
+
+You must have a valid Amazon Web Services developer account, and be
+signed up to use Amazon DynamoDB. More information is available at
+https://aws.amazon.com/dynamodb[Amazon DynamoDB].
+
+== URI Format
+
+[source,java]
+------------------------------
+aws-ddb://domainName[?options]
+------------------------------
+
+You can append query options to the URI in the following format,
+?options=value&option2=value&...
+
+== URI Options
+
+
+// component options: START
+The AWS DynamoDB component supports 6 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *accessKey* (producer) | Amazon AWS Access Key |  | String
+| *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
+| *region* (producer) | The region in which DDB client needs to work |  | String
+| *secretKey* (producer) | Amazon AWS Secret Key |  | String
+| *basicPropertyBinding* (advanced) | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
+| *configuration* (advanced) | The AWS DDB default configuration |  | DdbConfiguration
+|===
+// component options: END
+
+
+
+
+// endpoint options: START
+The AWS DynamoDB endpoint is configured using URI syntax:
+
+----
+aws-ddb:tableName
+----
+
+with the following path and query parameters:
+
+=== Path Parameters (1 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *tableName* | *Required* The name of the table currently worked with. |  | String
+|===
+
+
+=== Query Parameters (16 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *amazonDDBClient* (producer) | To use the AmazonDynamoDB as the client |  | AmazonDynamoDB
+| *consistentRead* (producer) | Determines whether or not strong consistency should be enforced when data is read. | false | boolean
+| *keyAttributeName* (producer) | Attribute name when creating table |  | String
+| *keyAttributeType* (producer) | Attribute type when creating table |  | String
+| *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
+| *operation* (producer) | What operation to perform. The value can be one of: BatchGetItems, DeleteItem, DeleteTable, DescribeTable, GetItem, PutItem, Query, Scan, UpdateItem, UpdateTable | PutItem | DdbOperations
+| *proxyHost* (producer) | To define a proxy host when instantiating the DDB client |  | String
+| *proxyPort* (producer) | To define a proxy port when instantiating the DDB client. When using this parameter, the configuration will expect the capitalized name of the region (for example AP_EAST_1) You'll need to use the name Regions.EU_WEST_1.name() |  | Integer
+| *proxyProtocol* (producer) | To define a proxy protocol when instantiating the DDB client. The value can be one of: HTTP, HTTPS | HTTPS | Protocol
+| *readCapacity* (producer) | The provisioned throughput to reserve for reading resources from your table |  | Long
+| *region* (producer) | The region in which DDB client needs to work |  | String
+| *writeCapacity* (producer) | The provisioned throughput to reserved for writing resources to your table |  | Long
+| *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
+| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+| *accessKey* (security) | Amazon AWS Access Key |  | String
+| *secretKey* (security) | Amazon AWS Secret Key |  | String
+|===
+// endpoint options: END
+// spring-boot-auto-configure options: START
+== Spring Boot Auto-Configuration
+
+When using Spring Boot make sure to use the following Maven dependency to have support for auto configuration:
+
+[source,xml]
+----
+<dependency>
+  <groupId>org.apache.camel.springboot</groupId>
+  <artifactId>camel-aws-ddb-starter</artifactId>
+  <version>x.x.x</version>
+  <!-- use the same version as your Camel core version -->
+</dependency>
+----
+
+
+The component supports 21 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *camel.component.aws-ddb.access-key* | Amazon AWS Access Key |  | String
+| *camel.component.aws-ddb.basic-property-binding* | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | Boolean
+| *camel.component.aws-ddb.bridge-error-handler* | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | Boolean
+| *camel.component.aws-ddb.configuration.access-key* | Amazon AWS Access Key |  | String
+| *camel.component.aws-ddb.configuration.amazon-d-d-b-client* | To use the AmazonDynamoDB as the client |  | AmazonDynamoDB
+| *camel.component.aws-ddb.configuration.consistent-read* | Determines whether or not strong consistency should be enforced when data is read. | false | Boolean
+| *camel.component.aws-ddb.configuration.key-attribute-name* | Attribute name when creating table |  | String
+| *camel.component.aws-ddb.configuration.key-attribute-type* | Attribute type when creating table |  | String
+| *camel.component.aws-ddb.configuration.operation* | What operation to perform |  | DdbOperations
+| *camel.component.aws-ddb.configuration.proxy-host* | To define a proxy host when instantiating the DDB client |  | String
+| *camel.component.aws-ddb.configuration.proxy-port* | To define a proxy port when instantiating the DDB client. When using this parameter, the configuration will expect the capitalized name of the region (for example AP_EAST_1) You'll need to use the name Regions.EU_WEST_1.name() |  | Integer
+| *camel.component.aws-ddb.configuration.proxy-protocol* | To define a proxy protocol when instantiating the DDB client |  | Protocol
+| *camel.component.aws-ddb.configuration.read-capacity* | The provisioned throughput to reserve for reading resources from your table |  | Long
+| *camel.component.aws-ddb.configuration.region* | The region in which DDB client needs to work |  | String
+| *camel.component.aws-ddb.configuration.secret-key* | Amazon AWS Secret Key |  | String
+| *camel.component.aws-ddb.configuration.table-name* | The name of the table currently worked with. |  | String
+| *camel.component.aws-ddb.configuration.write-capacity* | The provisioned throughput to reserved for writing resources to your table |  | Long
+| *camel.component.aws-ddb.enabled* | Whether to enable auto configuration of the aws-ddb component. This is enabled by default. |  | Boolean
+| *camel.component.aws-ddb.lazy-start-producer* | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed th [...]
+| *camel.component.aws-ddb.region* | The region in which DDB client needs to work |  | String
+| *camel.component.aws-ddb.secret-key* | Amazon AWS Secret Key |  | String
+|===
+// spring-boot-auto-configure options: END
+
+
+
+
+Required DDB component options
+
+You have to provide the amazonDDBClient in the
+Registry or your accessKey and secretKey to access
+the https://aws.amazon.com/dynamodb[Amazon's DynamoDB].
+
+== Usage
+
+=== Message headers evaluated by the DDB producer
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsDdbBatchItems` |`Map<String, KeysAndAttributes>` |A map of the table name and corresponding items to get by primary key.
+
+|`CamelAwsDdbTableName` |`String` |Table Name for this operation.
+
+|`CamelAwsDdbKey` |`Key` |The primary key that uniquely identifies each item in a table.
+
+|`CamelAwsDdbReturnValues` |`String` |Use this parameter if you want to get the attribute name-value pairs
+before or after they are modified(NONE, ALL_OLD, UPDATED_OLD, ALL_NEW,
+UPDATED_NEW).
+
+|`CamelAwsDdbUpdateCondition` |`Map<String, ExpectedAttributeValue>` |Designates an attribute for a conditional modification.
+
+|`CamelAwsDdbAttributeNames` |`Collection<String>` |If attribute names are not specified then all attributes will be
+returned.
+
+|`CamelAwsDdbConsistentRead` |`Boolean` |If set to true, then a consistent read is issued, otherwise eventually
+consistent is used.
+
+|`CamelAwsDdbIndexName` |`String` |If set will be used as Secondary Index for Query operation.
+
+|`CamelAwsDdbItem` |`Map<String, AttributeValue>` |A map of the attributes for the item, and must include the primary key
+values that define the item.
+
+|`CamelAwsDdbExactCount` |`Boolean` |If set to true, Amazon DynamoDB returns a total number of items that
+match the query parameters, instead of a list of the matching items and
+their attributes.
+
+|`CamelAwsDdbKeyConditions` |`Map<String, Condition>` |This header specify the selection criteria for the
+query, and merge together the two old headers *CamelAwsDdbHashKeyValue*
+and *CamelAwsDdbScanRangeKeyCondition*
+
+|`CamelAwsDdbStartKey` |`Key` |Primary key of the item from which to continue an earlier query.
+
+|`CamelAwsDdbHashKeyValue` |`AttributeValue` |Value of the hash component of the composite primary key.
+
+|`CamelAwsDdbLimit` |`Integer` |The maximum number of items to return.
+
+|`CamelAwsDdbScanRangeKeyCondition` |`Condition` |A container for the attribute values and comparison operators to use for
+the query.
+
+|`CamelAwsDdbScanIndexForward` |`Boolean` |Specifies forward or backward traversal of the index.
+
+|`CamelAwsDdbScanFilter` |`Map<String, Condition>` |Evaluates the scan results and returns only the desired values.
+
+|`CamelAwsDdbUpdateValues` |`Map<String, AttributeValueUpdate>` |Map of attribute name to the new value and action for the update.
+|=======================================================================
+
+=== Message headers set during BatchGetItems operation
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsDdbBatchResponse` |`Map<String,BatchResponse>` |Table names and the respective item attributes from the tables.
+
+|`CamelAwsDdbUnprocessedKeys` |`Map<String,KeysAndAttributes>` |Contains a map of tables and their respective keys that were not
+processed with the current response.
+|=======================================================================
+
+=== Message headers set during DeleteItem operation
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsDdbAttributes` |`Map<String, AttributeValue>` |The list of attributes returned by the operation.
+|=======================================================================
+
+=== Message headers set during DeleteTable operation
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsDdbProvisionedThroughput`| | 
+
+|`ProvisionedThroughputDescription` | |The value of the ProvisionedThroughput property for this table
+
+|`CamelAwsDdbCreationDate` |`Date` |Creation DateTime of this table.
+
+|`CamelAwsDdbTableItemCount` |`Long` |Item count for this table.
+
+|`CamelAwsDdbKeySchema` |`KeySchema` |The KeySchema that identifies the primary key for this table. 
+*From Camel 2.16.0 the type of this header is List<KeySchemaElement> and not KeySchema*
+
+|`CamelAwsDdbTableName` |`String` |The table name. 
+
+|`CamelAwsDdbTableSize` |`Long` |The table size in bytes. 
+
+|`CamelAwsDdbTableStatus` |`String` |The status of the table: CREATING, UPDATING, DELETING, ACTIVE
+|=======================================================================
+
+=== Message headers set during DescribeTable operation
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsDdbProvisionedThroughput` |{\{ProvisionedThroughputDescription\}} |The value of the ProvisionedThroughput property for this table
+
+|`CamelAwsDdbCreationDate` |`Date` |Creation DateTime of this table.
+
+|`CamelAwsDdbTableItemCount` |`Long` |Item count for this table.
+
+|`CamelAwsDdbKeySchema` |{\{KeySchema\}} |The KeySchema that identifies the primary key for this table.
+
+
+|`CamelAwsDdbTableName` |`String` |The table name.
+
+|`CamelAwsDdbTableSize` |`Long` |The table size in bytes.
+
+|`CamelAwsDdbTableStatus` |`String` |The status of the table: CREATING, UPDATING, DELETING, ACTIVE
+
+|`CamelAwsDdbReadCapacity` |`Long` |ReadCapacityUnits property of this table.
+
+|`CamelAwsDdbWriteCapacity` |`Long` |WriteCapacityUnits property of this table.
+|=======================================================================
+
+=== Message headers set during GetItem operation
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsDdbAttributes` |`Map<String, AttributeValue>` |The list of attributes returned by the operation.
+
+|=======================================================================
+
+=== Message headers set during PutItem operation
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsDdbAttributes` |`Map<String, AttributeValue>` |The list of attributes returned by the operation.
+
+|=======================================================================
+
+=== Message headers set during Query operation
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsDdbItems` |`List<java.util.Map<String,AttributeValue>>` |The list of attributes returned by the operation.
+
+|`CamelAwsDdbLastEvaluatedKey` |`Key` |Primary key of the item where the query operation stopped, inclusive of
+the previous result set.
+
+|`CamelAwsDdbConsumedCapacity` |`Double` |The number of Capacity Units of the provisioned throughput of the table
+consumed during the operation.
+
+|`CamelAwsDdbCount` |`Integer` |Number of items in the response.
+|=======================================================================
+
+=== Message headers set during Scan operation
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsDdbItems` |`List<java.util.Map<String,AttributeValue>>` |The list of attributes returned by the operation.
+
+|`CamelAwsDdbLastEvaluatedKey` |`Key` |Primary key of the item where the query operation stopped, inclusive of
+the previous result set.
+
+|`CamelAwsDdbConsumedCapacity` |`Double` |The number of Capacity Units of the provisioned throughput of the table
+consumed during the operation.
+
+|`CamelAwsDdbCount` |`Integer` |Number of items in the response.
+
+|`CamelAwsDdbScannedCount` |`Integer` |Number of items in the complete scan before any filters are applied.
+|=======================================================================
+
+=== Message headers set during UpdateItem operation
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsDdbAttributes` |`Map<String, AttributeValue>` |The list of attributes returned by the operation.
+
+|=======================================================================
+
+=== Advanced AmazonDynamoDB configuration
+
+If you need more control over the `AmazonDynamoDB` instance
+configuration you can create your own instance and refer to it from the
+URI:
+
+[source,java]
+----------------------------------------------------
+from("direct:start")
+.to("aws-ddb://domainName?amazonDDBClient=#client");
+----------------------------------------------------
+
+The `#client` refers to a `AmazonDynamoDB` in the
+Registry.
+
+For example if your Camel Application is running behind a firewall:
+
+[source,java]
+--------------------------------------------------------------------------------------
+AWSCredentials awsCredentials = new BasicAWSCredentials("myAccessKey", "mySecretKey");
+ClientConfiguration clientConfiguration = new ClientConfiguration();
+clientConfiguration.setProxyHost("http://myProxyHost");
+clientConfiguration.setProxyPort(8080);
+
+AmazonDynamoDB client = new AmazonDynamoDBClient(awsCredentials, clientConfiguration);
+
+registry.bind("client", client);
+--------------------------------------------------------------------------------------
+
+== Supported producer operations
+
+- BatchGetItems
+- DeleteItem
+- DeleteTable
+- DescribeTable
+- GetItem
+- PutItem
+- Query
+- Scan
+- UpdateItem
+- UpdateTable
+
+== Automatic detection of AmazonDynamoDB client in registry
+
+The component is capable of detecting the presence of an AmazonDynamoDB bean into the registry.
+If it's the only instance of that type it will be used as client and you won't have to define it as uri parameter.
+This may be really useful for smarter configuration of the endpoint.
+
+== Dependencies
+
+Maven users will need to add the following dependency to their pom.xml.
+
+*pom.xml*
+
+[source,xml]
+---------------------------------------
+<dependency>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>camel-aws-ddb</artifactId>
+    <version>${camel-version}</version>
+</dependency>
+---------------------------------------
+
+where `$\{camel-version\}` must be replaced by the actual version of Camel.
+
diff --git a/components/camel-aws2-ddb/src/main/docs/aws2-ddbstream-component.adoc b/components/camel-aws2-ddb/src/main/docs/aws2-ddbstream-component.adoc
new file mode 100644
index 0000000..4f9f0d5
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/docs/aws2-ddbstream-component.adoc
@@ -0,0 +1,252 @@
+[[aws-ddbstream-component]]
+= AWS DynamoDB Streams Component
+
+*Since Camel 2.17*
+
+// HEADER START
+*Only consumer is supported*
+// HEADER END
+
+The DynamoDB Stream component supports receiving messages from
+Amazon DynamoDB Stream service.
+
+Prerequisites
+
+You must have a valid Amazon Web Services developer account, and be
+signed up to use Amazon DynamoDB Streams. More information are available
+at https://aws.amazon.com/dynamodb/[AWS DynamoDB]
+
+== URI Format
+
+[source,java]
+------------------------------------
+aws-ddbstream://table-name[?options]
+------------------------------------
+
+The stream needs to be created prior to it being used. +
+ You can append query options to the URI in the following format,
+?options=value&option2=value&...
+
+== URI Options
+
+
+// component options: START
+The AWS DynamoDB Streams component supports 6 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *accessKey* (consumer) | Amazon AWS Access Key |  | String
+| *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
+| *region* (consumer) | Amazon AWS Region |  | String
+| *secretKey* (consumer) | Amazon AWS Secret Key |  | String
+| *basicPropertyBinding* (advanced) | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
+| *configuration* (advanced) | The AWS DDB stream default configuration |  | DdbStreamConfiguration
+|===
+// component options: END
+
+
+
+
+
+
+// endpoint options: START
+The AWS DynamoDB Streams endpoint is configured using URI syntax:
+
+----
+aws-ddbstream:tableName
+----
+
+with the following path and query parameters:
+
+=== Path Parameters (1 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *tableName* | *Required* Name of the dynamodb table |  | String
+|===
+
+
+=== Query Parameters (31 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *amazonDynamoDbStreamsClient* (consumer) | Amazon DynamoDB client to use for all requests for this endpoint |  | AmazonDynamoDBStreams
+| *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
+| *iteratorType* (consumer) | Defines where in the DynaboDB stream to start getting records. Note that using TRIM_HORIZON can cause a significant delay before the stream has caught up to real-time. if {AT,AFTER}_SEQUENCE_NUMBER are used, then a sequenceNumberProvider MUST be supplied. The value can be one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER | LATEST | ShardIteratorType
+| *maxResultsPerRequest* (consumer) | Maximum number of records that will be fetched in each poll |  | int
+| *proxyHost* (consumer) | To define a proxy host when instantiating the DDBStreams client |  | String
+| *proxyPort* (consumer) | To define a proxy port when instantiating the DDBStreams client |  | Integer
+| *proxyProtocol* (consumer) | To define a proxy protocol when instantiating the DDBStreams client. The value can be one of: HTTP, HTTPS | HTTPS | Protocol
+| *region* (consumer) | The region in which DDBStreams client needs to work |  | String
+| *sendEmptyMessageWhenIdle* (consumer) | If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead. | false | boolean
+| *sequenceNumberProvider* (consumer) | Provider for the sequence number when using one of the two ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER iterator types. Can be a registry reference or a literal sequence number. |  | SequenceNumberProvider
+| *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
+| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. The value can be one of: InOnly, InOut, InOptionalOut |  | ExchangePattern
+| *pollStrategy* (consumer) | A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel. |  | PollingConsumerPollStrategy
+| *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
+| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+| *backoffErrorThreshold* (scheduler) | The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. |  | int
+| *backoffIdleThreshold* (scheduler) | The number of subsequent idle polls that should happen before the backoffMultipler should kick-in. |  | int
+| *backoffMultiplier* (scheduler) | To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured. |  | int
+| *delay* (scheduler) | Milliseconds before the next poll. You can also specify time values using units, such as 60s (60 seconds), 5m30s (5 minutes and 30 seconds), and 1h (1 hour). | 500 | long
+| *greedy* (scheduler) | If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages. | false | boolean
+| *initialDelay* (scheduler) | Milliseconds before the first poll starts. You can also specify time values using units, such as 60s (60 seconds), 5m30s (5 minutes and 30 seconds), and 1h (1 hour). | 1000 | long
+| *repeatCount* (scheduler) | Specifies a maximum limit of number of fires. So if you set it to 1, the scheduler will only fire once. If you set it to 5, it will only fire five times. A value of zero or negative means fire forever. | 0 | long
+| *runLoggingLevel* (scheduler) | The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that. The value can be one of: TRACE, DEBUG, INFO, WARN, ERROR, OFF | TRACE | LoggingLevel
+| *scheduledExecutorService* (scheduler) | Allows for configuring a custom/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool. |  | ScheduledExecutorService
+| *scheduler* (scheduler) | To use a cron scheduler from either camel-spring or camel-quartz component. The value can be one of: none, spring, quartz | none | String
+| *schedulerProperties* (scheduler) | To configure additional properties when using a custom scheduler or any of the Quartz, Spring based scheduler. |  | Map
+| *startScheduler* (scheduler) | Whether the scheduler should be auto started. | true | boolean
+| *timeUnit* (scheduler) | Time unit for initialDelay and delay options. The value can be one of: NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS | MILLISECONDS | TimeUnit
+| *useFixedDelay* (scheduler) | Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details. | true | boolean
+| *accessKey* (security) | Amazon AWS Access Key |  | String
+| *secretKey* (security) | Amazon AWS Secret Key |  | String
+|===
+// endpoint options: END
+// spring-boot-auto-configure options: START
+== Spring Boot Auto-Configuration
+
+When using Spring Boot make sure to use the following Maven dependency to have support for auto configuration:
+
+[source,xml]
+----
+<dependency>
+  <groupId>org.apache.camel.springboot</groupId>
+  <artifactId>camel-aws-ddb-starter</artifactId>
+  <version>x.x.x</version>
+  <!-- use the same version as your Camel core version -->
+</dependency>
+----
+
+
+The component supports 18 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *camel.component.aws-ddbstream.access-key* | Amazon AWS Access Key |  | String
+| *camel.component.aws-ddbstream.basic-property-binding* | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | Boolean
+| *camel.component.aws-ddbstream.bridge-error-handler* | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | Boolean
+| *camel.component.aws-ddbstream.configuration.access-key* | Amazon AWS Access Key |  | String
+| *camel.component.aws-ddbstream.configuration.amazon-dynamo-db-streams-client* | Amazon DynamoDB client to use for all requests for this endpoint |  | AmazonDynamoDBStreams
+| *camel.component.aws-ddbstream.configuration.iterator-type* | Defines where in the DynaboDB stream to start getting records. Note that using TRIM_HORIZON can cause a significant delay before the stream has caught up to real-time. if {AT,AFTER}_SEQUENCE_NUMBER are used, then a sequenceNumberProvider MUST be supplied. |  | ShardIteratorType
+| *camel.component.aws-ddbstream.configuration.max-results-per-request* | Maximum number of records that will be fetched in each poll |  | Integer
+| *camel.component.aws-ddbstream.configuration.proxy-host* | To define a proxy host when instantiating the DDBStreams client |  | String
+| *camel.component.aws-ddbstream.configuration.proxy-port* | To define a proxy port when instantiating the DDBStreams client |  | Integer
+| *camel.component.aws-ddbstream.configuration.proxy-protocol* | To define a proxy protocol when instantiating the DDBStreams client |  | Protocol
+| *camel.component.aws-ddbstream.configuration.region* | The region in which DDBStreams client needs to work |  | String
+| *camel.component.aws-ddbstream.configuration.secret-key* | Amazon AWS Secret Key |  | String
+| *camel.component.aws-ddbstream.configuration.sequence-number-provider* | Provider for the sequence number when using one of the two ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER iterator types. Can be a registry reference or a literal sequence number. |  | SequenceNumberProvider
+| *camel.component.aws-ddbstream.configuration.table-name* | Name of the dynamodb table |  | String
+| *camel.component.aws-ddbstream.enabled* | Whether to enable auto configuration of the aws-ddbstream component. This is enabled by default. |  | Boolean
+| *camel.component.aws-ddbstream.lazy-start-producer* | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is proces [...]
+| *camel.component.aws-ddbstream.region* | Amazon AWS Region |  | String
+| *camel.component.aws-ddbstream.secret-key* | Amazon AWS Secret Key |  | String
+|===
+// spring-boot-auto-configure options: END
+
+
+
+
+
+
+Required DynampDBStream component options
+
+You have to provide the amazonDynamoDbStreamsClient in the
+Registry with proxies and relevant credentials
+configured.
+
+== Sequence Numbers
+
+You can provide a literal string as the sequence number or provide a
+bean in the registry. An example of using the bean would be to save your
+current position in the change feed and restore it on Camel startup.
+
+It is an error to provide a sequence number that is greater than the
+largest sequence number in the describe-streams result, as this will
+lead to the AWS call returning an HTTP 400.
+
+== Batch Consumer
+
+This component implements the Batch Consumer.
+
+This allows you for instance to know how many messages exists in this
+batch and for instance let the Aggregator
+aggregate this number of messages.
+
+== Usage
+
+=== AmazonDynamoDBStreamsClient configuration
+
+You will need to create an instance of AmazonDynamoDBStreamsClient and
+bind it to the registry
+
+[source,java]
+--------------------------------------------------------------------------------------------------------------------
+ClientConfiguration clientConfiguration = new ClientConfiguration();
+clientConfiguration.setProxyHost("http://myProxyHost");
+clientConfiguration.setProxyPort(8080);
+
+Region region = Region.getRegion(Regions.fromName(region));
+region.createClient(AmazonDynamoDBStreamsClient.class, null, clientConfiguration);
+// the 'null' here is the AWSCredentialsProvider which defaults to an instance of DefaultAWSCredentialsProviderChain
+
+registry.bind("kinesisClient", client);
+--------------------------------------------------------------------------------------------------------------------
+
+=== Providing AWS Credentials
+
+It is recommended that the credentials are obtained by using the
+http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html[DefaultAWSCredentialsProviderChain]
+that is the default when creating a new ClientConfiguration instance,
+however, a
+different http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html[AWSCredentialsProvider]
+can be specified when calling createClient(...).
+
+== Coping with Downtime
+
+=== AWS DynamoDB Streams outage of less than 24 hours
+
+The consumer will resume from the last seen sequence number (as
+implemented
+for https://issues.apache.org/jira/browse/CAMEL-9515[CAMEL-9515]), so
+you should receive a flood of events in quick succession, as long as the
+outage did not also include DynamoDB itself.
+
+=== AWS DynamoDB Streams outage of more than 24 hours
+
+Given that AWS only retain 24 hours worth of changes, you will have
+missed change events no matter what mitigations are in place.
+
+== Automatic detection of AmazonDynamoDBStreams client in registry
+
+The component is capable of detecting the presence of an AmazonDynamoDBStreams bean into the registry.
+If it's the only instance of that type it will be used as client and you won't have to define it as uri parameter.
+This may be really useful for smarter configuration of the endpoint.
+
+== Dependencies
+
+Maven users will need to add the following dependency to their pom.xml.
+
+*pom.xml*
+
+[source,xml]
+---------------------------------------
+<dependency>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>camel-aws</artifactId>
+    <version>${camel-version}</version>
+</dependency>
+---------------------------------------
+
+where `$\{camel-version\}` must be replaced by the actual version of Camel.
+
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/AbstractDdbCommand.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/AbstractDdbCommand.java
new file mode 100644
index 0000000..533329a
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/AbstractDdbCommand.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
+
+public abstract class AbstractDdbCommand {
+    protected Ddb2Configuration configuration;
+    protected Exchange exchange;
+    protected DynamoDbClient ddbClient;
+
+    public AbstractDdbCommand(DynamoDbClient ddbClient,
+                              Ddb2Configuration configuration, Exchange exchange) {
+
+        this.ddbClient = ddbClient;
+        this.configuration = configuration;
+        this.exchange = exchange;
+    }
+
+    public abstract void execute();
+
+    protected Message getMessageForResponse(Exchange exchange) {
+        return exchange.getMessage();
+    }
+
+    protected String determineTableName() {
+        String tableName = exchange.getIn().getHeader(Ddb2Constants.TABLE_NAME, String.class);
+        return tableName != null ? tableName : configuration.getTableName();
+    }
+
+    @SuppressWarnings("unchecked")
+    protected Map<String, ExpectedAttributeValue> determineUpdateCondition() {
+        return exchange.getIn().getHeader(Ddb2Constants.UPDATE_CONDITION, Map.class);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected Map<String, AttributeValue> determineItem() {
+        return exchange.getIn().getHeader(Ddb2Constants.ITEM, Map.class);
+    }
+
+    protected String determineReturnValues() {
+        return exchange.getIn().getHeader(Ddb2Constants.RETURN_VALUES, String.class);
+    }
+
+    protected void addAttributesToResult(Map<String, AttributeValue> attributes) {
+        Message msg = getMessageForResponse(exchange);
+        msg.setHeader(Ddb2Constants.ATTRIBUTES, attributes);
+    }
+    
+    protected void addToResults(Map<String, Object> map) {
+        Message msg = getMessageForResponse(exchange);
+        for (Map.Entry<String, Object> en : map.entrySet()) {
+            msg.setHeader(en.getKey(), en.getValue());
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected Map<String, AttributeValue> determineKey() {
+        return exchange.getIn().getHeader(Ddb2Constants.KEY, Map.class);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected Collection<String> determineAttributeNames() {
+        return exchange.getIn().getHeader(Ddb2Constants.ATTRIBUTE_NAMES, Collection.class);
+    }
+
+    protected Boolean determineConsistentRead() {
+        return exchange.getIn().getHeader(Ddb2Constants.CONSISTENT_READ, configuration.isConsistentRead(), Boolean.class);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected Map<String, AttributeValue> determineExclusiveStartKey() {
+        return exchange.getIn().getHeader(Ddb2Constants.START_KEY, Map.class);
+    }
+
+    protected Integer determineLimit() {
+        return exchange.getIn().getHeader(Ddb2Constants.LIMIT, Integer.class);
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/BatchGetItemsCommand.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/BatchGetItemsCommand.java
new file mode 100644
index 0000000..37f3391
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/BatchGetItemsCommand.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchGetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchGetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.KeysAndAttributes;
+
+public class BatchGetItemsCommand extends AbstractDdbCommand {
+
+    public BatchGetItemsCommand(DynamoDbClient ddbClient, Ddb2Configuration configuration, Exchange exchange) {
+        super(ddbClient, configuration, exchange);
+    }
+
+    @Override
+    public void execute() {
+        BatchGetItemResponse result = ddbClient.batchGetItem(
+                BatchGetItemRequest.builder().requestItems(determineBatchItems()).build());
+
+        Map tmp = new HashMap<>();
+        tmp.put(Ddb2Constants.BATCH_RESPONSE, result.responses());
+        tmp.put(Ddb2Constants.UNPROCESSED_KEYS, result.unprocessedKeys());
+        addToResults(tmp);        
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, KeysAndAttributes> determineBatchItems() {
+        return exchange.getIn().getHeader(Ddb2Constants.BATCH_ITEMS, Map.class);
+    }
+}
+
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Component.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Component.java
new file mode 100644
index 0000000..6a25f7d
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Component.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+
+@Component("aws2-ddb")
+public class Ddb2Component extends DefaultComponent {
+
+    @Metadata
+    private String accessKey;
+    @Metadata
+    private String secretKey;
+    @Metadata
+    private String region;
+    @Metadata(label = "advanced")    
+    private Ddb2Configuration configuration;
+    
+    public Ddb2Component() {
+        this(null);
+    }
+
+    public Ddb2Component(CamelContext context) {
+        super(context);
+        
+        registerExtension(new Ddb2ComponentVerifierExtension());
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+
+        if (remaining == null || remaining.trim().length() == 0) {
+            throw new IllegalArgumentException("Table name must be specified.");
+        }
+        Ddb2Configuration configuration = this.configuration != null ? this.configuration.copy() : new Ddb2Configuration();
+        configuration.setTableName(remaining);
+        Ddb2Endpoint endpoint = new Ddb2Endpoint(uri, this, configuration);
+        endpoint.getConfiguration().setAccessKey(accessKey);
+        endpoint.getConfiguration().setSecretKey(secretKey);
+        endpoint.getConfiguration().setRegion(region);
+        setProperties(endpoint, parameters);
+        checkAndSetRegistryClient(configuration);
+        if (configuration.getAmazonDDBClient() == null && (configuration.getAccessKey() == null || configuration.getSecretKey() == null)) {
+            throw new IllegalArgumentException("amazonDDBClient or accessKey and secretKey must be specified");
+        }
+
+        return endpoint;
+    }
+    
+    public Ddb2Configuration getConfiguration() {
+        return configuration;
+    }
+
+    /**
+     * The AWS DDB default configuration
+     */
+    public void setConfiguration(Ddb2Configuration configuration) {
+        this.configuration = configuration;
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+    
+    /**
+     * Amazon AWS Access Key
+     */
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    /**
+     * Amazon AWS Secret Key
+     */
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+    
+    /**
+     * The region in which DDB client needs to work
+     */
+    public String getRegion() {
+        return region;
+    }
+
+    public void setRegion(String region) {
+        this.region = region;
+    }
+    
+    private void checkAndSetRegistryClient(Ddb2Configuration configuration) {
+        Set<DynamoDbClient> clients = getCamelContext().getRegistry().findByType(DynamoDbClient.class);
+        if (clients.size() == 1) {
+            configuration.setAmazonDDBClient(clients.stream().findFirst().get());
+        }
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2ComponentVerifierExtension.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2ComponentVerifierExtension.java
new file mode 100644
index 0000000..2734221
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2ComponentVerifierExtension.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.Map;
+
+import org.apache.camel.component.extension.verifier.DefaultComponentVerifierExtension;
+import org.apache.camel.component.extension.verifier.ResultBuilder;
+import org.apache.camel.component.extension.verifier.ResultErrorBuilder;
+import org.apache.camel.component.extension.verifier.ResultErrorHelper;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
+
+public class Ddb2ComponentVerifierExtension extends DefaultComponentVerifierExtension {
+
+    public Ddb2ComponentVerifierExtension() {
+        this("aws2-ddb");
+    }
+
+    public Ddb2ComponentVerifierExtension(String scheme) {
+        super(scheme);
+    }
+
+    // *********************************
+    // Parameters validation
+    // *********************************
+
+
+    /**
+     * Basic check of the parameters (they are not empty)
+     *
+     * @param parameters
+     * @return
+     */
+    @Override
+    protected Result verifyParameters(Map<String, Object> parameters) {
+
+        ResultBuilder builder = ResultBuilder.withStatusAndScope(Result.Status.OK, Scope.PARAMETERS).error(ResultErrorHelper.requiresOption("accessKey", parameters))
+                .error(ResultErrorHelper.requiresOption("secretKey", parameters)).error(ResultErrorHelper.requiresOption("region", parameters));
+
+        // Validate using the catalog
+
+        super.verifyParametersAgainstCatalog(builder, parameters);
+
+        return builder.build();
+    }
+
+    // *********************************
+    // Connectivity validation
+    // *********************************
+
+    /**
+     * To verify the connectivity, we will try a basic test connection to extract the 
+     * list of tables and see if it fails
+     *
+     * @param parameters
+     * @return
+     */
+    @Override
+    protected Result verifyConnectivity(Map<String, Object> parameters) {
+        ResultBuilder builder = ResultBuilder.withStatusAndScope(Result.Status.OK, Scope.CONNECTIVITY);
+
+        try {
+            Ddb2Configuration configuration = setProperties(new Ddb2Configuration(), parameters);
+            AwsBasicCredentials cred = AwsBasicCredentials.create(configuration.getAccessKey(), configuration.getSecretKey());
+            DynamoDbClientBuilder clientBuilder = DynamoDbClient.builder();
+            DynamoDbClient client = clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred)).region(Region.of(configuration.getRegion())).build();
+            client.listTables();
+        } catch (SdkClientException e) {
+            ResultErrorBuilder errorBuilder = ResultErrorBuilder.withCodeAndDescription(VerificationError.StandardCode.AUTHENTICATION, e.getMessage())
+                    .detail("aws_ddb_exception_message", e.getMessage()).detail(VerificationError.ExceptionAttribute.EXCEPTION_CLASS, e.getClass().getName())
+                    .detail(VerificationError.ExceptionAttribute.EXCEPTION_INSTANCE, e);
+
+            builder.error(errorBuilder.build());
+        } catch (Exception e) {
+            builder.error(ResultErrorBuilder.withException(e).build());
+        }
+        return builder.build();
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Configuration.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Configuration.java
new file mode 100644
index 0000000..17d6b38
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Configuration.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.spi.UriPath;
+
+import software.amazon.awssdk.core.Protocol;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+
+@UriParams
+public class Ddb2Configuration implements Cloneable {
+
+    @UriPath @Metadata(required = true)
+    private String tableName;
+    @UriParam(label = "security", secret = true)
+    private String accessKey;
+    @UriParam(label = "security", secret = true)
+    private String secretKey;
+    @UriParam
+    private DynamoDbClient amazonDDBClient;
+    @UriParam
+    private boolean consistentRead;
+    @UriParam(defaultValue = "PutItem")
+    private Ddb2Operations operation = Ddb2Operations.PutItem;
+    @UriParam
+    private Long readCapacity;
+    @UriParam
+    private Long writeCapacity;
+    @UriParam
+    private String keyAttributeName;
+    @UriParam
+    private String keyAttributeType;
+    @UriParam(enums = "HTTP,HTTPS", defaultValue = "HTTPS")
+    private Protocol proxyProtocol = Protocol.HTTPS;
+    @UriParam
+    private String proxyHost;
+    @UriParam
+    private Integer proxyPort;
+    @UriParam
+    private String region;
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    /**
+     * Amazon AWS Access Key
+     */
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    /**
+     * Amazon AWS Secret Key
+     */
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    public DynamoDbClient getAmazonDDBClient() {
+        return amazonDDBClient;
+    }
+
+    /**
+     * To use the AmazonDynamoDB as the client
+     */
+    public void setAmazonDDBClient(DynamoDbClient amazonDDBClient) {
+        this.amazonDDBClient = amazonDDBClient;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    /**
+     * The name of the table currently worked with.
+     */
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public Ddb2Operations getOperation() {
+        return operation;
+    }
+
+    /**
+     * What operation to perform
+     */
+    public void setOperation(Ddb2Operations operation) {
+        this.operation = operation;
+    }
+
+    public boolean isConsistentRead() {
+        return consistentRead;
+    }
+
+    /**
+     * Determines whether or not strong consistency should be enforced when data is read.
+     */
+    public void setConsistentRead(boolean consistentRead) {
+        this.consistentRead = consistentRead;
+    }
+
+    public Long getReadCapacity() {
+        return readCapacity;
+    }
+
+    /**
+     * The provisioned throughput to reserve for reading resources from your table
+     */
+    public void setReadCapacity(Long readCapacity) {
+        this.readCapacity = readCapacity;
+    }
+
+    public Long getWriteCapacity() {
+        return writeCapacity;
+    }
+
+    /**
+     * The provisioned throughput to reserved for writing resources to your table
+     */
+    public void setWriteCapacity(Long writeCapacity) {
+        this.writeCapacity = writeCapacity;
+    }
+
+    public String getKeyAttributeName() {
+        return keyAttributeName;
+    }
+
+    /**
+     * Attribute name when creating table
+     */
+    public void setKeyAttributeName(String keyAttributeName) {
+        this.keyAttributeName = keyAttributeName;
+    }
+
+    public String getKeyAttributeType() {
+        return keyAttributeType;
+    }
+
+    /**
+     * Attribute type when creating table
+     */
+    public void setKeyAttributeType(String keyAttributeType) {
+        this.keyAttributeType = keyAttributeType;
+    }
+    
+    public Protocol getProxyProtocol() {
+        return proxyProtocol;
+    }
+
+    /**
+     * To define a proxy protocol when instantiating the DDB client
+     */
+    public void setProxyProtocol(Protocol proxyProtocol) {
+        this.proxyProtocol = proxyProtocol;
+    }
+    
+    public String getProxyHost() {
+        return proxyHost;
+    }
+
+    /**
+     * To define a proxy host when instantiating the DDB client
+     */
+    public void setProxyHost(String proxyHost) {
+        this.proxyHost = proxyHost;
+    }
+
+    public Integer getProxyPort() {
+        return proxyPort;
+    }
+
+    /**
+     * To define a proxy port when instantiating the DDB client. When using this parameter, the configuration will expect the capitalized name of the region (for example AP_EAST_1)
+     * You'll need to use the name Regions.EU_WEST_1.name()
+     */
+    public void setProxyPort(Integer proxyPort) {
+        this.proxyPort = proxyPort;
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    /**
+     * The region in which DDB client needs to work
+     */
+    public void setRegion(String region) {
+        this.region = region;
+    }
+    
+    // *************************************************
+    //
+    // *************************************************
+
+    public Ddb2Configuration copy() {
+        try {
+            return (Ddb2Configuration)super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Constants.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Constants.java
new file mode 100644
index 0000000..dc0eb3c
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Constants.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+/**
+ * Constants used in Camel AWS DynamoDB component
+ */
+public interface Ddb2Constants {
+    String ATTRIBUTES = "CamelAwsDdbAttributes";
+    String ATTRIBUTE_NAMES = "CamelAwsDdbAttributeNames";
+    String BATCH_ITEMS = "CamelAwsDdbBatchItems";
+    String BATCH_RESPONSE = "CamelAwsDdbBatchResponse";
+    String CONSISTENT_READ = "CamelAwsDdbConsistentRead";
+    String CONSUMED_CAPACITY = "CamelAwsDdbConsumedCapacity";
+    String COUNT = "CamelAwsDdbCount";
+    String CREATION_DATE = "CamelAwsDdbCreationDate";
+    // Removed from DynamoDB v1 to v2
+    // String EXACT_COUNT = "CamelAwsDdbExactCount";
+    // Removed from DynamoDB v1 to v2
+    // String HASH_KEY_VALUE = "CamelAwsDdbHashKeyValue";
+    // Added INDEX_NAME for querying secondary indexes
+    String INDEX_NAME = "CamelAwsDdbIndexName";
+    String ITEM = "CamelAwsDdbItem";
+    String ITEMS = "CamelAwsDdbItems";
+    String ITEM_COUNT = "CamelAwsDdbTableItemCount";
+    String ITEM_NAME = "CamelAwsDdbItemName";
+    String MESSAGE_ID = "CamelAwsDdbMessageId";
+    String NEXT_TOKEN = "CamelAwsDdbNextToken";
+    String KEY = "CamelAwsDdbKey";
+    // Added from DynamoDB v1 to v2
+    String KEY_CONDITIONS = "CamelAwsDdbKeyConditions";
+    String KEY_SCHEMA = "CamelAwsDdbKeySchema";
+    String LAST_EVALUATED_KEY = "CamelAwsDdbLastEvaluatedKey";
+    String LIMIT = "CamelAwsDdbLimit";
+    String OPERATION = "CamelAwsDdbOperation";
+    String PROVISIONED_THROUGHPUT = "CamelAwsDdbProvisionedThroughput";
+    String READ_CAPACITY = "CamelAwsDdbReadCapacity";
+    String RETURN_VALUES = "CamelAwsDdbReturnValues";
+    String SCANNED_COUNT = "CamelAwsDdbScannedCount";
+    String SCAN_INDEX_FORWARD = "CamelAwsDdbScanIndexForward";
+    // Removed from DynamoDB v1 to v2
+    // String SCAN_RANGE_KEY_CONDITION = "CamelAwsDdbScanRangeKeyCondition";
+    String SCAN_FILTER = "CamelAwsDdbScanFilter";
+    String START_KEY = "CamelAwsDdbStartKey";
+    String TABLE_NAME = "CamelAwsDdbTableName";
+    String TABLE_SIZE = "CamelAwsDdbTableSize";
+    String TABLE_STATUS = "CamelAwsDdbTableStatus";
+    String UPDATE_CONDITION = "CamelAwsDdbUpdateCondition";
+    String UPDATE_VALUES = "CamelAwsDdbUpdateValues";
+    String UNPROCESSED_KEYS = "CamelAwsDdbUnprocessedKeys";
+    String WRITE_CAPACITY = "CamelAwsDdbWriteCapacity";
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Endpoint.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Endpoint.java
new file mode 100644
index 0000000..532e455
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Endpoint.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.net.URI;
+
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.support.ScheduledPollEndpoint;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.http.apache.ProxyConfiguration;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.TableDescription;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+
+/**
+ * The aws-ddb component is used for storing and retrieving data from Amazon's DynamoDB service.
+ */
+@UriEndpoint(firstVersion = "3.1.0", scheme = "aws2-ddb", title = "AWS 2 DynamoDB", syntax = "aws2-ddb:tableName", producerOnly = true, label = "cloud,database,nosql")
+public class Ddb2Endpoint extends ScheduledPollEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Ddb2Endpoint.class);
+
+    @UriParam
+    private Ddb2Configuration configuration;
+
+    private DynamoDbClient ddbClient;
+
+    public Ddb2Endpoint(String uri, Component component, Ddb2Configuration configuration) {
+        super(uri, component);
+        this.configuration = configuration;
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        throw new UnsupportedOperationException("You cannot receive messages from this endpoint");
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new Ddb2Producer(this);
+    }
+
+    @Override
+    public void doStart() throws Exception {
+        super.doStart();
+
+        ddbClient = configuration.getAmazonDDBClient() != null ? configuration.getAmazonDDBClient()
+            : createDdbClient();
+        
+        String tableName = getConfiguration().getTableName();
+        LOG.trace("Querying whether table [{}] already exists...", tableName);
+
+        try {
+            DescribeTableRequest.Builder request = DescribeTableRequest.builder().tableName(tableName);
+            TableDescription tableDescription = ddbClient.describeTable(request.build()).table();
+            if (!isTableActive(tableDescription)) {
+                waitForTableToBecomeAvailable(tableName);
+            }
+
+            LOG.trace("Table [{}] already exists", tableName);
+            return;
+        } catch (ResourceNotFoundException e) {
+            LOG.trace("Table [{}] doesn't exist yet", tableName);
+            LOG.trace("Creating table [{}]...", tableName);
+            TableDescription tableDescription = createTable(tableName);
+            if (!isTableActive(tableDescription)) {
+                waitForTableToBecomeAvailable(tableName);
+            }
+
+            LOG.trace("Table [{}] created", tableName);
+        }
+    }
+    
+    @Override
+    public void doStop() throws Exception {
+        if (ObjectHelper.isEmpty(configuration.getAmazonDDBClient())) {
+            if (ddbClient != null) {
+                ddbClient.close();
+            }
+        }
+        super.doStop();
+    }
+
+    private TableDescription createTable(String tableName) {
+        CreateTableRequest.Builder createTableRequest = CreateTableRequest.builder().tableName(tableName)
+                .keySchema(
+                        KeySchemaElement.builder().attributeName(
+                                configuration.getKeyAttributeName())
+                                .keyType(configuration.getKeyAttributeType()).build())
+                .provisionedThroughput(
+                        ProvisionedThroughput.builder().readCapacityUnits(configuration.getReadCapacity())
+                                .writeCapacityUnits(configuration.getWriteCapacity()).build());
+        return getDdbClient().createTable(createTableRequest.build()).tableDescription();
+    }
+
+    public Ddb2Configuration getConfiguration() {
+        return configuration;
+    }
+
+    public DynamoDbClient getDdbClient() {
+        return ddbClient;
+    }
+
+    DynamoDbClient createDdbClient() {
+    	DynamoDbClient client = null;
+    	DynamoDbClientBuilder clientBuilder = DynamoDbClient.builder();
+        ProxyConfiguration.Builder proxyConfig = null;
+        ApacheHttpClient.Builder httpClientBuilder = null;
+        boolean isClientConfigFound = false;
+        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
+            proxyConfig = ProxyConfiguration.builder();
+            URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + configuration.getProxyHost() + configuration.getProxyPort());
+            proxyConfig.endpoint(proxyEndpoint);
+            httpClientBuilder = ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
+            isClientConfigFound = true;
+        }
+        if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) {
+            AwsBasicCredentials cred = AwsBasicCredentials.create(configuration.getAccessKey(), configuration.getSecretKey());
+            if (isClientConfigFound) {
+                clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder).credentialsProvider(StaticCredentialsProvider.create(cred));
+            } else {
+                clientBuilder = clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred));
+            }
+        } else {
+            if (!isClientConfigFound) {
+                clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder);
+            }
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
+            clientBuilder = clientBuilder.region(Region.of(configuration.getRegion()));
+        }
+        client = clientBuilder.build();
+        return client;
+    }
+
+    private void waitForTableToBecomeAvailable(String tableName) {
+        LOG.trace("Waiting for [{}] to become ACTIVE...", tableName);
+
+        long waitTime = 5 * 60 * 1000;
+        while (waitTime > 0) {
+            try {
+                Thread.sleep(1000 * 5);
+                waitTime -= 5000;
+            } catch (Exception e) {
+            }
+            try {
+                DescribeTableRequest request = DescribeTableRequest.builder().tableName(tableName).build();
+                TableDescription tableDescription = getDdbClient().describeTable(request).table();
+                if (isTableActive(tableDescription)) {
+                    LOG.trace("Table [{}] became active", tableName);
+                    return;
+                }
+                LOG.trace("Table [{}] not active yet", tableName);
+            } catch (AwsServiceException ase) {
+                if (!ase.getMessage().contains("ResourceNotFoundException")) {
+                    throw ase;
+                }
+            }
+        }
+
+        throw new RuntimeException("Table " + tableName + " never went active");
+    }
+
+    private boolean isTableActive(TableDescription tableDescription) {
+        return tableDescription.tableStatus().equals(TableStatus.ACTIVE.toString());
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Operations.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Operations.java
new file mode 100644
index 0000000..a6e51f0
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Operations.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+public enum Ddb2Operations {
+    BatchGetItems,
+    DeleteItem,
+    DeleteTable,
+    DescribeTable,
+    GetItem,
+    PutItem,
+    Query,
+    Scan,
+    UpdateItem,
+    UpdateTable
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Producer.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Producer.java
new file mode 100644
index 0000000..040a29e
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Producer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.URISupport;
+
+/**
+ * A Producer which stores data into the Amazon DynamoDB Service
+ * <a href="http://aws.amazon.com/dynamodb/">AWS DynamoDB</a>
+ */
+public class Ddb2Producer extends DefaultProducer {
+
+    private transient String ddbProducerToString;
+    
+    public Ddb2Producer(Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        switch (determineOperation(exchange)) {
+        case BatchGetItems:
+            new BatchGetItemsCommand(getEndpoint().getDdbClient(), getConfiguration(), exchange).execute();
+            break;
+        case DeleteItem:
+            new DeleteItemCommand(getEndpoint().getDdbClient(), getConfiguration(), exchange).execute();
+            break;
+        case DeleteTable:
+            new DeleteTableCommand(getEndpoint().getDdbClient(), getConfiguration(), exchange).execute();
+            break;
+        case DescribeTable:
+            new DescribeTableCommand(getEndpoint().getDdbClient(), getConfiguration(), exchange).execute();
+            break;
+        case GetItem:
+            new GetItemCommand(getEndpoint().getDdbClient(), getConfiguration(), exchange).execute();
+            break;
+        case PutItem:
+            new PutItemCommand(getEndpoint().getDdbClient(), getConfiguration(), exchange).execute();
+            break;
+        case Query:
+            new QueryCommand(getEndpoint().getDdbClient(), getConfiguration(), exchange).execute();
+            break;
+        case Scan:
+            new ScanCommand(getEndpoint().getDdbClient(), getConfiguration(), exchange).execute();
+            break;
+        case UpdateItem:
+            new UpdateItemCommand(getEndpoint().getDdbClient(), getConfiguration(), exchange).execute();
+            break;
+        case UpdateTable:
+            new UpdateTableCommand(getEndpoint().getDdbClient(), getConfiguration(), exchange).execute();
+            break;
+        default:
+            throw new IllegalArgumentException("Unsupported operation");
+        }
+    }
+
+    private Ddb2Operations determineOperation(Exchange exchange) {
+        Ddb2Operations operation = exchange.getIn().getHeader(Ddb2Constants.OPERATION, Ddb2Operations.class);
+        return operation != null ? operation : getConfiguration().getOperation();
+    }
+
+    protected Ddb2Configuration getConfiguration() {
+        return getEndpoint().getConfiguration();
+    }
+
+    @Override
+    public String toString() {
+        if (ddbProducerToString == null) {
+            ddbProducerToString = "DdbProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+        }
+        return ddbProducerToString;
+    }
+
+    @Override
+    public Ddb2Endpoint getEndpoint() {
+        return (Ddb2Endpoint)super.getEndpoint();
+    }
+}
\ No newline at end of file
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/DeleteItemCommand.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/DeleteItemCommand.java
new file mode 100644
index 0000000..1bd5f5a
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/DeleteItemCommand.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import org.apache.camel.Exchange;
+
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse;
+
+public class DeleteItemCommand extends AbstractDdbCommand {
+
+    public DeleteItemCommand(DynamoDbClient ddbClient, Ddb2Configuration configuration, Exchange exchange) {
+        super(ddbClient, configuration, exchange);
+    }
+
+    @Override
+    public void execute() {
+        DeleteItemResponse result = ddbClient.deleteItem(DeleteItemRequest.builder()
+                .tableName(determineTableName())
+                .key(determineKey())
+                .returnValues(determineReturnValues())
+                .expected(determineUpdateCondition()).build());
+
+        addAttributesToResult(result.attributes());
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/DeleteTableCommand.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/DeleteTableCommand.java
new file mode 100644
index 0000000..f33ccc8
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/DeleteTableCommand.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.TableDescription;
+
+public class DeleteTableCommand extends AbstractDdbCommand {
+
+    public DeleteTableCommand(DynamoDbClient ddbClient, Ddb2Configuration configuration,
+                              Exchange exchange) {
+        super(ddbClient, configuration, exchange);
+    }
+
+    @Override
+    public void execute() {
+        TableDescription tableDescription = ddbClient
+                .deleteTable(DeleteTableRequest.builder().tableName(determineTableName()).build()).tableDescription();
+
+        Map tmp = new HashMap<>();
+        tmp.put(Ddb2Constants.PROVISIONED_THROUGHPUT, tableDescription.provisionedThroughput());
+        tmp.put(Ddb2Constants.CREATION_DATE, tableDescription.creationDateTime());
+        tmp.put(Ddb2Constants.ITEM_COUNT, tableDescription.itemCount());
+        tmp.put(Ddb2Constants.KEY_SCHEMA, tableDescription.keySchema());
+        tmp.put(Ddb2Constants.TABLE_NAME, tableDescription.tableName());
+        tmp.put(Ddb2Constants.TABLE_SIZE, tableDescription.tableSizeBytes());
+        tmp.put(Ddb2Constants.TABLE_STATUS, tableDescription.tableStatus());
+        addToResults(tmp);
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/DescribeTableCommand.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/DescribeTableCommand.java
new file mode 100644
index 0000000..a284451
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/DescribeTableCommand.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+
+public class DescribeTableCommand extends AbstractDdbCommand {
+
+    public DescribeTableCommand(DynamoDbClient ddbClient, Ddb2Configuration configuration,
+                                Exchange exchange) {
+        super(ddbClient, configuration, exchange);
+    }
+
+    @Override
+    public void execute() {
+        DescribeTableResponse result = ddbClient.describeTable(DescribeTableRequest.builder()
+                .tableName(determineTableName()).build());
+
+        Message msg = getMessageForResponse(exchange);
+        msg.setHeader(Ddb2Constants.TABLE_NAME, result.table().tableName());
+        msg.setHeader(Ddb2Constants.TABLE_STATUS, result.table().tableStatus());
+        msg.setHeader(Ddb2Constants.CREATION_DATE, result.table().creationDateTime());
+        msg.setHeader(Ddb2Constants.ITEM_COUNT, result.table().itemCount());
+        msg.setHeader(Ddb2Constants.KEY_SCHEMA, result.table().keySchema());
+        msg.setHeader(Ddb2Constants.READ_CAPACITY,
+                result.table().provisionedThroughput().readCapacityUnits());
+        msg.setHeader(Ddb2Constants.WRITE_CAPACITY,
+                result.table().provisionedThroughput().writeCapacityUnits());
+        msg.setHeader(Ddb2Constants.TABLE_SIZE, result.table().tableSizeBytes());
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/GetItemCommand.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/GetItemCommand.java
new file mode 100644
index 0000000..b7e7861
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/GetItemCommand.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import org.apache.camel.Exchange;
+
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+
+public class GetItemCommand extends AbstractDdbCommand {
+
+    public GetItemCommand(DynamoDbClient ddbClient, Ddb2Configuration configuration, Exchange exchange) {
+        super(ddbClient, configuration, exchange);
+    }
+
+    @Override
+    public void execute() {
+        GetItemResponse result = ddbClient.getItem(GetItemRequest.builder()
+                .key(determineKey())
+                .tableName(determineTableName())
+                .attributesToGet(determineAttributeNames())
+                .consistentRead(determineConsistentRead()).build());
+        addAttributesToResult(result.item());
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/PutItemCommand.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/PutItemCommand.java
new file mode 100644
index 0000000..34e841a
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/PutItemCommand.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import org.apache.camel.Exchange;
+
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutItemResponse;
+
+public class PutItemCommand extends AbstractDdbCommand {
+
+    public PutItemCommand(DynamoDbClient ddbClient, Ddb2Configuration configuration, Exchange exchange) {
+        super(ddbClient, configuration, exchange);
+    }
+
+    @Override
+    public void execute() {
+        PutItemResponse result = ddbClient.putItem(PutItemRequest.builder()
+                .tableName(determineTableName())
+                .item(determineItem())
+                .expected(determineUpdateCondition())
+                .returnValues(determineReturnValues()).build());
+
+        addAttributesToResult(result.attributes());
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/QueryCommand.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/QueryCommand.java
new file mode 100644
index 0000000..3959fe7
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/QueryCommand.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+
+public class QueryCommand extends AbstractDdbCommand {
+
+    public QueryCommand(DynamoDbClient ddbClient, Ddb2Configuration configuration, Exchange exchange) {
+        super(ddbClient, configuration, exchange);
+    }
+
+    @Override
+    public void execute() {
+        QueryRequest.Builder query = QueryRequest.builder()
+                .tableName(determineTableName())
+                .attributesToGet(determineAttributeNames())
+                .consistentRead(determineConsistentRead())
+                .exclusiveStartKey(determineStartKey())
+                .keyConditions(determineKeyConditions())
+                .exclusiveStartKey(determineStartKey())
+                .limit(determineLimit())
+                .scanIndexForward(determineScanIndexForward());
+        
+        // Check if we have set an Index Name
+        if (exchange.getIn().getHeader(Ddb2Constants.INDEX_NAME, String.class) != null) {
+            query.indexName(exchange.getIn().getHeader(Ddb2Constants.INDEX_NAME, String.class));
+        }
+        
+        QueryResponse result = ddbClient.query(query.build());
+        
+        Map tmp = new HashMap<>();
+        tmp.put(Ddb2Constants.ITEMS, result.items());
+        tmp.put(Ddb2Constants.LAST_EVALUATED_KEY, result.lastEvaluatedKey());
+        tmp.put(Ddb2Constants.CONSUMED_CAPACITY, result.consumedCapacity());
+        tmp.put(Ddb2Constants.COUNT, result.count());
+        addToResults(tmp);
+    }
+
+    private  Map<String, AttributeValue> determineStartKey() {
+        return exchange.getIn().getHeader(Ddb2Constants.START_KEY, Map.class);
+    }
+
+    private Boolean determineScanIndexForward() {
+        return exchange.getIn().getHeader(Ddb2Constants.SCAN_INDEX_FORWARD, Boolean.class);
+    }
+
+    private Map determineKeyConditions() {
+        return exchange.getIn().getHeader(Ddb2Constants.KEY_CONDITIONS, Map.class);
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/ScanCommand.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/ScanCommand.java
new file mode 100644
index 0000000..ff6d7a4
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/ScanCommand.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.Condition;
+import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
+import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
+
+import org.apache.camel.Exchange;
+
+public class ScanCommand extends AbstractDdbCommand {
+
+    public ScanCommand(DynamoDbClient ddbClient, Ddb2Configuration configuration, Exchange exchange) {
+        super(ddbClient, configuration, exchange);
+    }
+
+    @Override
+    public void execute() {
+        ScanResponse result = ddbClient.scan(ScanRequest.builder()
+                .tableName(determineTableName())
+                .limit(determineLimit())
+                .exclusiveStartKey(determineExclusiveStartKey())
+                .scanFilter(determineScanFilter()).build());
+
+        Map tmp = new HashMap<>();
+        tmp.put(Ddb2Constants.ITEMS, result.items());
+        tmp.put(Ddb2Constants.LAST_EVALUATED_KEY, result.lastEvaluatedKey());
+        tmp.put(Ddb2Constants.CONSUMED_CAPACITY, result.consumedCapacity());
+        tmp.put(Ddb2Constants.COUNT, result.count());
+        tmp.put(Ddb2Constants.SCANNED_COUNT, result.scannedCount());
+        addToResults(tmp);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, Condition> determineScanFilter() {
+        return exchange.getIn().getHeader(Ddb2Constants.SCAN_FILTER, Map.class);
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/UpdateItemCommand.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/UpdateItemCommand.java
new file mode 100644
index 0000000..e767057
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/UpdateItemCommand.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.Map;
+
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;
+
+import org.apache.camel.Exchange;
+
+public class UpdateItemCommand extends AbstractDdbCommand {
+
+    public UpdateItemCommand(DynamoDbClient ddbClient, Ddb2Configuration configuration, Exchange exchange) {
+        super(ddbClient, configuration, exchange);
+    }
+
+    @Override
+    public void execute() {
+        UpdateItemResponse result = ddbClient.updateItem(UpdateItemRequest.builder()
+                .tableName(determineTableName())
+                .key(determineKey())
+                .attributeUpdates(determineUpdateValues())
+                .expected(determineUpdateCondition())
+                .returnValues(determineReturnValues()).build());
+
+        addAttributesToResult(result.attributes());
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, AttributeValueUpdate> determineUpdateValues() {
+        return exchange.getIn().getHeader(Ddb2Constants.UPDATE_VALUES, Map.class);
+    }
+
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/UpdateTableCommand.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/UpdateTableCommand.java
new file mode 100644
index 0000000..beae65a
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/UpdateTableCommand.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import org.apache.camel.Exchange;
+
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
+import software.amazon.awssdk.services.dynamodb.model.UpdateTableRequest;
+
+public class UpdateTableCommand extends AbstractDdbCommand {
+
+    public UpdateTableCommand(DynamoDbClient ddbClient, Ddb2Configuration configuration, Exchange exchange) {
+        super(ddbClient, configuration, exchange);
+    }
+
+    @Override
+    public void execute() {
+        ddbClient.updateTable(UpdateTableRequest.builder()
+                .tableName(determineTableName())
+                .provisionedThroughput(ProvisionedThroughput.builder()
+                        .readCapacityUnits(determineReadCapacity())
+                        .writeCapacityUnits(determineWriteCapacity()).build()).build());
+    }
+
+    private Long determineReadCapacity() {
+        Long readCapacity = exchange.getIn().getHeader(Ddb2Constants.READ_CAPACITY, Long.class);
+        return readCapacity != null ? readCapacity : configuration.getReadCapacity();
+    }
+
+
+    private Long determineWriteCapacity() {
+        Long writeCapacity = exchange.getIn().getHeader(Ddb2Constants.WRITE_CAPACITY, Long.class);
+        return writeCapacity != null ? writeCapacity : configuration.getWriteCapacity();
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/BigIntComparisons.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/BigIntComparisons.java
new file mode 100644
index 0000000..6cf62eb
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/BigIntComparisons.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import java.math.BigInteger;
+
+interface BigIntComparisons {
+
+    /**
+     * @return true if the first parameter is LT/LTEQ/EQ/GTEQ/GT the second
+     */
+    boolean matches(BigInteger first, BigInteger second);
+
+    enum Conditions implements BigIntComparisons {
+        LT() {
+            @Override
+            public boolean matches(BigInteger first, BigInteger second) {
+                return first.compareTo(second) < 0;
+            }
+        },
+
+        LTEQ() {
+            @Override
+            public boolean matches(BigInteger first, BigInteger second) {
+                return first.compareTo(second) <= 0;
+            }
+        }
+        // TODO Add EQ/GTEQ/GT as needed, but note that GTEQ == !LT and GT == !LTEQ and EQ == (!LT && !GT)
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponent.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponent.java
new file mode 100644
index 0000000..7526663
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponent.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+
+@Component("aws2-ddbstream")
+public class Ddb2StreamComponent extends DefaultComponent {
+    
+    @Metadata
+    private String accessKey;
+    @Metadata
+    private String secretKey;
+    @Metadata
+    private String region;
+    @Metadata(label = "advanced")    
+    private Ddb2StreamConfiguration configuration;
+
+    public Ddb2StreamComponent() {
+        this(null);
+    }
+
+    public Ddb2StreamComponent(CamelContext context) {
+        super(context);
+        
+        registerExtension(new Ddb2StreamComponentVerifierExtension());
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        
+        if (remaining == null || remaining.trim().length() == 0) {
+            throw new IllegalArgumentException("Table name must be specified.");
+        }
+        Ddb2StreamConfiguration configuration = this.configuration != null ? this.configuration.copy() : new Ddb2StreamConfiguration();
+        configuration.setTableName(remaining);
+        Ddb2StreamEndpoint endpoint = new Ddb2StreamEndpoint(uri, configuration, this);
+        endpoint.getConfiguration().setAccessKey(accessKey);
+        endpoint.getConfiguration().setSecretKey(secretKey);
+        endpoint.getConfiguration().setRegion(region);
+        setProperties(endpoint, parameters);
+        checkAndSetRegistryClient(configuration);
+        if (configuration.getAmazonDynamoDbStreamsClient() == null && (configuration.getAccessKey() == null || configuration.getSecretKey() == null)) {
+            throw new IllegalArgumentException("amazonDDBStreamsClient or accessKey and secretKey must be specified");
+        }
+        return endpoint;
+    }
+    
+    public Ddb2StreamConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    /**
+     * The AWS DDB stream default configuration
+     */
+    public void setConfiguration(Ddb2StreamConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    /**
+     * Amazon AWS Access Key
+     */
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    /**
+     * Amazon AWS Secret Key
+     */
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    /**
+     * Amazon AWS Region
+     */
+    public void setRegion(String region) {
+        this.region = region;
+    }
+    
+    private void checkAndSetRegistryClient(Ddb2StreamConfiguration configuration) {
+        Set<DynamoDbStreamsClient> clients = getCamelContext().getRegistry().findByType(DynamoDbStreamsClient.class);
+        if (clients.size() == 1) {
+            configuration.setAmazonDynamoDbStreamsClient(clients.stream().findFirst().get());
+        }
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponentVerifierExtension.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponentVerifierExtension.java
new file mode 100644
index 0000000..aa4c405
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponentVerifierExtension.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import java.util.Map;
+
+import org.apache.camel.component.extension.verifier.DefaultComponentVerifierExtension;
+import org.apache.camel.component.extension.verifier.ResultBuilder;
+import org.apache.camel.component.extension.verifier.ResultErrorBuilder;
+import org.apache.camel.component.extension.verifier.ResultErrorHelper;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClientBuilder;
+
+public class Ddb2StreamComponentVerifierExtension extends DefaultComponentVerifierExtension {
+
+    public Ddb2StreamComponentVerifierExtension() {
+        this("aws2-ddbstream");
+    }
+
+    public Ddb2StreamComponentVerifierExtension(String scheme) {
+        super(scheme);
+    }
+
+    // *********************************
+    // Parameters validation
+    // *********************************
+
+    @Override
+    protected Result verifyParameters(Map<String, Object> parameters) {
+
+        ResultBuilder builder = ResultBuilder.withStatusAndScope(Result.Status.OK, Scope.PARAMETERS).error(ResultErrorHelper.requiresOption("accessKey", parameters))
+            .error(ResultErrorHelper.requiresOption("secretKey", parameters)).error(ResultErrorHelper.requiresOption("region", parameters));
+
+        // Validate using the catalog
+
+        super.verifyParametersAgainstCatalog(builder, parameters);
+
+        return builder.build();
+    }
+
+    // *********************************
+    // Connectivity validation
+    // *********************************
+
+    @Override
+    protected Result verifyConnectivity(Map<String, Object> parameters) {
+        ResultBuilder builder = ResultBuilder.withStatusAndScope(Result.Status.OK, Scope.CONNECTIVITY);
+
+        try {
+            Ddb2StreamConfiguration configuration = setProperties(new Ddb2StreamConfiguration(), parameters);
+            AwsBasicCredentials cred = AwsBasicCredentials.create(configuration.getAccessKey(), configuration.getSecretKey());
+            DynamoDbStreamsClientBuilder clientBuilder = DynamoDbStreamsClient.builder();
+            DynamoDbStreamsClient client = clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred)).region(Region.of(configuration.getRegion())).build();
+            client.listStreams();
+        } catch (SdkClientException e) {
+            ResultErrorBuilder errorBuilder = ResultErrorBuilder.withCodeAndDescription(VerificationError.StandardCode.AUTHENTICATION, e.getMessage())
+                .detail("aws_ddb_stream_exception_message", e.getMessage()).detail(VerificationError.ExceptionAttribute.EXCEPTION_CLASS, e.getClass().getName())
+                .detail(VerificationError.ExceptionAttribute.EXCEPTION_INSTANCE, e);
+
+            builder.error(errorBuilder.build());
+        } catch (Exception e) {
+            builder.error(ResultErrorBuilder.withException(e).build());
+        }
+        return builder.build();
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConfiguration.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConfiguration.java
new file mode 100644
index 0000000..a725a7f
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConfiguration.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.spi.UriPath;
+
+import software.amazon.awssdk.core.Protocol;
+import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+
+@UriParams
+public class Ddb2StreamConfiguration implements Cloneable {
+    
+    @UriPath(label = "consumer", description = "Name of the dynamodb table")
+    @Metadata(required = true)
+    private String tableName;
+    
+    @UriParam(label = "security", secret = true, description = "Amazon AWS Access Key")
+    private String accessKey;
+    @UriParam(label = "security", secret = true, description = "Amazon AWS Secret Key")
+    private String secretKey;
+    @UriParam(description = "The region in which DDBStreams client needs to work")
+    private String region;
+
+    @UriParam(label = "consumer", description = "Amazon DynamoDB client to use for all requests for this endpoint")
+    private DynamoDbStreamsClient amazonDynamoDbStreamsClient;
+
+    @UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll")
+    private int maxResultsPerRequest = 100;
+
+    @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream"
+            + " to start getting records. Note that using TRIM_HORIZON can cause a"
+            + " significant delay before the stream has caught up to real-time."
+            + " if {AT,AFTER}_SEQUENCE_NUMBER are used, then a sequenceNumberProvider"
+            + " MUST be supplied.",
+            defaultValue = "LATEST")
+    private ShardIteratorType iteratorType = ShardIteratorType.LATEST;
+
+    @UriParam(label = "consumer", description = "Provider for the sequence number when"
+            + " using one of the two ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER"
+            + " iterator types. Can be a registry reference or a literal sequence number.")
+    private SequenceNumberProvider sequenceNumberProvider;
+    @UriParam(enums = "HTTP,HTTPS", defaultValue = "HTTPS", description = "To define a proxy protocol when instantiating the DDBStreams client")
+    private Protocol proxyProtocol = Protocol.HTTPS;
+    @UriParam(description = "To define a proxy host when instantiating the DDBStreams client")
+    private String proxyHost;
+    @UriParam(description = "To define a proxy port when instantiating the DDBStreams client")
+    private Integer proxyPort;
+    
+    public DynamoDbStreamsClient getAmazonDynamoDbStreamsClient() {
+        return amazonDynamoDbStreamsClient;
+    }
+
+    public void setAmazonDynamoDbStreamsClient(DynamoDbStreamsClient amazonDynamoDbStreamsClient) {
+        this.amazonDynamoDbStreamsClient = amazonDynamoDbStreamsClient;
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    public void setRegion(String region) {
+        this.region = region;
+    }
+
+    public int getMaxResultsPerRequest() {
+        return maxResultsPerRequest;
+    }
+
+    public void setMaxResultsPerRequest(int maxResultsPerRequest) {
+        this.maxResultsPerRequest = maxResultsPerRequest;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public ShardIteratorType getIteratorType() {
+        return iteratorType;
+    }
+
+    public void setIteratorType(ShardIteratorType iteratorType) {
+        this.iteratorType = iteratorType;
+    }
+
+    public SequenceNumberProvider getSequenceNumberProvider() {
+        return sequenceNumberProvider;
+    }
+
+    public void setSequenceNumberProvider(SequenceNumberProvider sequenceNumberProvider) {
+        this.sequenceNumberProvider = sequenceNumberProvider;
+    }
+    
+    public Protocol getProxyProtocol() {
+        return proxyProtocol;
+    }
+
+    public void setProxyProtocol(Protocol proxyProtocol) {
+        this.proxyProtocol = proxyProtocol;
+    }
+
+    public String getProxyHost() {
+        return proxyHost;
+    }
+
+    public void setProxyHost(String proxyHost) {
+        this.proxyHost = proxyHost;
+    }
+
+    public Integer getProxyPort() {
+        return proxyPort;
+    }
+
+    public void setProxyPort(Integer proxyPort) {
+        this.proxyPort = proxyPort;
+    }
+    
+    // *************************************************
+    //
+    // *************************************************
+
+    public Ddb2StreamConfiguration copy() {
+        try {
+            return (Ddb2StreamConfiguration)super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
new file mode 100644
index 0000000..0c78e42
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import java.math.BigInteger;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException;
+import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
+import software.amazon.awssdk.services.dynamodb.model.Record;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+
+public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Ddb2StreamConsumer.class);
+
+    private final ShardIteratorHandler shardIteratorHandler;
+    private String lastSeenSequenceNumber;
+
+    public Ddb2StreamConsumer(Ddb2StreamEndpoint endpoint, Processor processor) {
+        this(endpoint, processor, new ShardIteratorHandler(endpoint));
+    }
+    
+    Ddb2StreamConsumer(Ddb2StreamEndpoint endpoint, Processor processor, ShardIteratorHandler shardIteratorHandler) {
+        super(endpoint, processor);
+        this.shardIteratorHandler = shardIteratorHandler;
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        GetRecordsResponse result;
+        try {
+            GetRecordsRequest.Builder req = GetRecordsRequest.builder()
+                        .shardIterator(shardIteratorHandler.getShardIterator(null))
+                        .limit(getEndpoint().getConfiguration().getMaxResultsPerRequest());
+            result = getClient().getRecords(req.build());
+        } catch (ExpiredIteratorException e) {
+            LOG.warn("Expired Shard Iterator, attempting to resume from {}", lastSeenSequenceNumber, e);
+            GetRecordsRequest.Builder req = GetRecordsRequest.builder()
+                        .shardIterator(shardIteratorHandler.getShardIterator(lastSeenSequenceNumber))
+                        .limit(getEndpoint().getConfiguration().getMaxResultsPerRequest());
+            result = getClient().getRecords(req.build());
+        }
+        List<Record> records = result.records();
+
+        Queue<Exchange> exchanges = createExchanges(records, lastSeenSequenceNumber);
+        int processedExchangeCount = processBatch(CastUtils.cast(exchanges));
+
+        shardIteratorHandler.updateShardIterator(result.nextShardIterator());
+        if (!records.isEmpty()) {
+            lastSeenSequenceNumber = records.get(records.size() - 1).dynamodb().sequenceNumber();
+        }
+
+        return processedExchangeCount;
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) throws Exception {
+        int processedExchanges = 0;
+        while (!exchanges.isEmpty()) {
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+
+            LOG.trace("Processing exchange [{}] started.", exchange);
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    LOG.trace("Processing exchange [{}] done.", exchange);
+                }
+            });
+            processedExchanges++;
+        }
+        return processedExchanges;
+    }
+
+    private DynamoDbStreamsClient getClient() {
+        return getEndpoint().getClient();
+    }
+
+    @Override
+    public Ddb2StreamEndpoint getEndpoint() {
+        return (Ddb2StreamEndpoint) super.getEndpoint();
+    }
+
+    private Queue<Exchange> createExchanges(List<Record> records, String lastSeenSequenceNumber) {
+        Queue<Exchange> exchanges = new ArrayDeque<>();
+        BigIntComparisons condition = null;
+        BigInteger providedSeqNum = null;
+        if (lastSeenSequenceNumber != null) {
+            providedSeqNum = new BigInteger(lastSeenSequenceNumber);
+            condition = BigIntComparisons.Conditions.LT;
+        }
+        switch(getEndpoint().getConfiguration().getIteratorType()) {
+        case AFTER_SEQUENCE_NUMBER:
+            condition = BigIntComparisons.Conditions.LT;
+            providedSeqNum = new BigInteger(getEndpoint().getConfiguration().getSequenceNumberProvider().getSequenceNumber());
+            break;
+        case AT_SEQUENCE_NUMBER:
+            condition = BigIntComparisons.Conditions.LTEQ;
+            providedSeqNum = new BigInteger(getEndpoint().getConfiguration().getSequenceNumberProvider().getSequenceNumber());
+            break;
+        default:
+        }
+        for (Record record : records) {
+            BigInteger recordSeqNum = new BigInteger(record.dynamodb().sequenceNumber());
+            if (condition == null || condition.matches(providedSeqNum, recordSeqNum)) {
+                exchanges.add(getEndpoint().createExchange(record));
+            }
+        }
+        return exchanges;
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
new file mode 100644
index 0000000..c4481414
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import java.net.URI;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.support.ScheduledPollEndpoint;
+import org.apache.camel.util.ObjectHelper;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.http.apache.ProxyConfiguration;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
+import software.amazon.awssdk.services.dynamodb.model.Record;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClientBuilder;
+
+/**
+ * The aws-ddbstream component is used for working with Amazon DynamoDB Streams.
+ */
+@UriEndpoint(firstVersion = "3.1.0", scheme = "aws2-ddbstream", title = "AWS 2 DynamoDB Streams",
+        consumerOnly = true, syntax = "aws2-ddbstream:tableName",
+        label = "cloud,messaging,streams")
+public class Ddb2StreamEndpoint extends ScheduledPollEndpoint {
+
+    @UriParam
+    Ddb2StreamConfiguration configuration;
+    
+    private DynamoDbStreamsClient ddbStreamClient;
+
+    public Ddb2StreamEndpoint(String uri, Ddb2StreamConfiguration configuration, Ddb2StreamComponent component) {
+        super(uri, component);
+        this.configuration = configuration;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        Ddb2StreamConsumer consumer = new Ddb2StreamConsumer(this, processor);
+        consumer.setSchedulerProperties(consumer.getEndpoint().getSchedulerProperties());
+        configureConsumer(consumer);
+        return consumer;
+    }
+
+    Exchange createExchange(Record record) {
+        Exchange ex = super.createExchange();
+        ex.getIn().setBody(record, Record.class);
+
+        return ex;
+    }
+    
+    @Override
+    public void doStart() throws Exception {
+        super.doStart();
+        
+        ddbStreamClient = configuration.getAmazonDynamoDbStreamsClient() != null ? configuration.getAmazonDynamoDbStreamsClient()
+            : createDdbStreamClient();
+    }
+    
+    @Override
+    public void doStop() throws Exception {
+        if (ObjectHelper.isEmpty(configuration.getAmazonDynamoDbStreamsClient())) {
+            if (ddbStreamClient != null) {
+                ddbStreamClient.close();
+            }
+        }
+        super.doStop();
+    }
+
+    public Ddb2StreamConfiguration getConfiguration() {
+        return configuration;
+    }
+    
+    public DynamoDbStreamsClient getClient() {
+        return ddbStreamClient;
+    }
+
+    public String getSequenceNumber() {
+        switch (configuration.getIteratorType()) {
+        case AFTER_SEQUENCE_NUMBER:
+        case AT_SEQUENCE_NUMBER:
+            if (null == configuration.getSequenceNumberProvider()) {
+                throw new IllegalStateException("sequenceNumberProvider must be"
+                        + " provided, either as an implementation of"
+                        + " SequenceNumberProvider or a literal String.");
+            } else {
+                return configuration.getSequenceNumberProvider().getSequenceNumber();
+            }
+        default:
+            return "";
+        }
+    }
+    
+    DynamoDbStreamsClient createDdbStreamClient() {
+    	DynamoDbStreamsClient client = null;
+    	DynamoDbStreamsClientBuilder clientBuilder = DynamoDbStreamsClient.builder();
+        ProxyConfiguration.Builder proxyConfig = null;
+        ApacheHttpClient.Builder httpClientBuilder = null;
+        boolean isClientConfigFound = false;
+        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
+            proxyConfig = ProxyConfiguration.builder();
+            URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + configuration.getProxyHost() + configuration.getProxyPort());
+            proxyConfig.endpoint(proxyEndpoint);
+            httpClientBuilder = ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
+            isClientConfigFound = true;
+        }
+        if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) {
+            AwsBasicCredentials cred = AwsBasicCredentials.create(configuration.getAccessKey(), configuration.getSecretKey());
+            if (isClientConfigFound) {
+                clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder).credentialsProvider(StaticCredentialsProvider.create(cred));
+            } else {
+                clientBuilder = clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred));
+            }
+        } else {
+            if (!isClientConfigFound) {
+                clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder);
+            }
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
+            clientBuilder = clientBuilder.region(Region.of(configuration.getRegion()));
+        }
+        client = clientBuilder.build();
+        return client;
+    }
+
+    @Override
+    public String toString() {
+        return "DdbStreamEndpoint{"
+                + "tableName=" + configuration.getTableName()
+                + ", amazonDynamoDbStreamsClient=[redacted], maxResultsPerRequest=" + configuration.getMaxResultsPerRequest()
+                + ", iteratorType=" + configuration.getIteratorType()
+                + ", sequenceNumberProvider=" + configuration.getSequenceNumberProvider()
+                + ", uri=" + getEndpointUri()
+                + '}';
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/SequenceNumberProvider.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/SequenceNumberProvider.java
new file mode 100644
index 0000000..b25fd86
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/SequenceNumberProvider.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+public interface SequenceNumberProvider {
+    String getSequenceNumber();
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandler.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandler.java
new file mode 100644
index 0000000..a8f3f56
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandler.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import java.math.BigInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.dynamodb.model.ListStreamsRequest;
+import software.amazon.awssdk.services.dynamodb.model.ListStreamsResponse;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+
+class ShardIteratorHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardIteratorHandler.class);
+
+    private final Ddb2StreamEndpoint endpoint;
+    private final ShardList shardList = new ShardList();
+
+    private String currentShardIterator;
+    private Shard currentShard;
+
+    ShardIteratorHandler(Ddb2StreamEndpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    String getShardIterator(String resumeFromSequenceNumber) {
+        ShardIteratorType iteratorType = getEndpoint().getConfiguration().getIteratorType();
+        String sequenceNumber = getEndpoint().getSequenceNumber();
+        if (resumeFromSequenceNumber != null) {
+            // Reset things as we're in an error condition.
+            currentShard = null;
+            currentShardIterator = null;
+            iteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+            sequenceNumber = resumeFromSequenceNumber;
+        }
+        // either return a cached one or get a new one via a GetShardIterator request.
+        if (currentShardIterator == null) {
+            ListStreamsResponse streamsListResult = getClient().listStreams(
+                    ListStreamsRequest.builder().tableName(getEndpoint().getConfiguration().getTableName()).build()
+            );
+            final String streamArn = streamsListResult.streams().get(0).streamArn(); // XXX assumes there is only one stream
+            DescribeStreamResponse streamDescriptionResult = getClient().describeStream(
+                    DescribeStreamRequest.builder().streamArn(streamArn).build()
+            );
+            shardList.addAll(streamDescriptionResult.streamDescription().shards());
+
+            LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
+            if (currentShard == null) {
+                currentShard = resolveNewShard(iteratorType, resumeFromSequenceNumber);
+            } else {
+                currentShard = shardList.nextAfter(currentShard);
+            }
+            shardList.removeOlderThan(currentShard);
+            LOG.trace("Next shard is: {} (in {})", currentShard, shardList);
+
+            GetShardIteratorResponse result = getClient().getShardIterator(
+                    buildGetShardIteratorRequest(streamArn, iteratorType, sequenceNumber)
+            );
+            currentShardIterator = result.shardIterator();
+        }
+        LOG.trace("Shard Iterator is: {}", currentShardIterator);
+        return currentShardIterator;
+    }
+
+    private GetShardIteratorRequest buildGetShardIteratorRequest(final String streamArn, ShardIteratorType iteratorType, String sequenceNumber) {
+        GetShardIteratorRequest.Builder req = GetShardIteratorRequest.builder()
+                .streamArn(streamArn)
+                .shardId(currentShard.shardId())
+                .shardIteratorType(iteratorType);
+        switch (iteratorType) {
+        case AFTER_SEQUENCE_NUMBER:
+        case AT_SEQUENCE_NUMBER:
+            // if you request with a sequence number that is LESS than the
+            // start of the shard, you get a HTTP 400 from AWS.
+            // So only add the sequence number if the endpoints
+            // sequence number is less than or equal to the starting
+            // sequence for the shard.
+            // Otherwise change the shart iterator type to trim_horizon
+            // because we get a 400 when we use one of the
+            // {at,after}_sequence_number iterator types and don't supply
+            // a sequence number.
+            if (BigIntComparisons.Conditions.LTEQ.matches(
+                    new BigInteger(currentShard.sequenceNumberRange().startingSequenceNumber()),
+                    new BigInteger(sequenceNumber)
+            )) {
+                req.sequenceNumber(sequenceNumber);
+            } else {
+                req.shardIteratorType(ShardIteratorType.TRIM_HORIZON);
+            }
+            break;
+        default:
+        }
+        return req.build();
+    }
+
+    private Shard resolveNewShard(ShardIteratorType type, String resumeFrom) {
+        switch(type) {
+        case AFTER_SEQUENCE_NUMBER:
+            return shardList.afterSeq(resumeFrom != null ? resumeFrom : getEndpoint().getSequenceNumber());
+        case AT_SEQUENCE_NUMBER:
+            return shardList.atSeq(getEndpoint().getSequenceNumber());
+        case TRIM_HORIZON:
+            return shardList.first();
+        case LATEST:
+        default:
+            return shardList.last();
+        }
+    }
+
+    void updateShardIterator(String nextShardIterator) {
+        this.currentShardIterator = nextShardIterator;
+    }
+
+    Ddb2StreamEndpoint getEndpoint() {
+        return endpoint;
+    }
+   
+    private DynamoDbStreamsClient getClient() {
+        return getEndpoint().getClient();
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardList.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardList.java
new file mode 100644
index 0000000..f904ecc
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardList.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+class ShardList {
+
+    private final Logger log = LoggerFactory.getLogger(ShardList.class);
+
+    private final Map<String, Shard> shards = new HashMap<>();
+
+    void addAll(Collection<Shard> shards) {
+        for (Shard shard : shards) {
+            add(shard);
+        }
+    }
+
+    void add(Shard shard) {
+        shards.put(shard.shardId(), shard);
+    }
+
+    Shard nextAfter(Shard previous) {
+        for (Shard shard : shards.values()) {
+            if (previous.shardId().equals(shard.parentShardId())) {
+                return shard;
+            }
+        }
+        throw new IllegalStateException("Unable to find the next shard for " + previous + " in " + shards);
+    }
+
+    Shard first() {
+        // Potential optimisation: if the two provided sequence numbers are the
+        // same then we can skip the shard entirely. Need to confirm this with AWS.
+        for (Shard shard : shards.values()) {
+            if (!shards.containsKey(shard.parentShardId())) {
+                return shard;
+            }
+        }
+        throw new IllegalStateException("Unable to find an unparented shard in " + shards);
+    }
+
+    Shard last() {
+        Map<String, Shard> shardsByParent = new HashMap<>();
+        for (Shard shard : shards.values()) {
+            shardsByParent.put(shard.parentShardId(), shard);
+        }
+        for (Shard shard : shards.values()) {
+            if (!shardsByParent.containsKey(shard.shardId())) {
+                return shard;
+            }
+        }
+        throw new IllegalStateException("Unable to find a shard with no children " + shards);
+    }
+
+    Shard afterSeq(String sequenceNumber) {
+        return atAfterSeq(sequenceNumber, BigIntComparisons.Conditions.LT);
+    }
+
+    Shard atSeq(String sequenceNumber) {
+        return atAfterSeq(sequenceNumber, BigIntComparisons.Conditions.LTEQ);
+    }
+
+    Shard atAfterSeq(String sequenceNumber, BigIntComparisons condition) {
+        BigInteger atAfter = new BigInteger(sequenceNumber);
+        List<Shard> sorted = new ArrayList<>();
+        sorted.addAll(shards.values());
+        Collections.sort(sorted, StartingSequenceNumberComparator.INSTANCE);
+        for (Shard shard : sorted) {
+            if (shard.sequenceNumberRange().endingSequenceNumber() != null) {
+                BigInteger end = new BigInteger(shard.sequenceNumberRange().endingSequenceNumber());
+                // essentially: after < end or after <= end
+                if (condition.matches(atAfter, end)) {
+                    return shard;
+                }
+
+            }
+        }
+        if (shards.size() > 0) {
+            return sorted.get(sorted.size() - 1);
+        }
+        throw new IllegalStateException("Unable to find a shard with appropriate sequence numbers for " + sequenceNumber + " in " + shards);
+    }
+
+    /**
+     * Removes shards that are older than the provided shard. Does not remove
+     * the provided shard.
+     *
+     * @param removeBefore
+     */
+    void removeOlderThan(Shard removeBefore) {
+        String current = removeBefore.parentShardId();
+
+        int removedShards = 0;
+        while (current != null) {
+            Shard s = shards.remove(current);
+            if (s == null) {
+                current = null;
+            } else {
+                removedShards++;
+                current = s.parentShardId();
+            }
+        }
+        log.trace("removed {} shards from the store, new size is {}", removedShards, shards.size());
+    }
+
+    @Override
+    public String toString() {
+        return "ShardList{" + "shards=" + shards + '}';
+    }
+
+    private enum StartingSequenceNumberComparator implements Comparator<Shard> {
+        INSTANCE() {
+            @Override
+            public int compare(Shard o1, Shard o2) {
+                BigInteger i1 = new BigInteger(o1.sequenceNumberRange().startingSequenceNumber());
+                BigInteger i2 = new BigInteger(o2.sequenceNumberRange().startingSequenceNumber());
+                return i1.compareTo(i2);
+            }
+        }
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StaticSequenceNumberProvider.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StaticSequenceNumberProvider.java
new file mode 100644
index 0000000..bdf90d5
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StaticSequenceNumberProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+public class StaticSequenceNumberProvider implements SequenceNumberProvider {
+
+    private final String sequenceNumber;
+
+    public StaticSequenceNumberProvider(String sequenceNumber) {
+        this.sequenceNumber = sequenceNumber;
+    }
+
+    @Override
+    public String getSequenceNumber() {
+        return sequenceNumber;
+    }
+}
diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverter.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverter.java
new file mode 100644
index 0000000..0178d1a
--- /dev/null
+++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import org.apache.camel.Converter;
+
+// Allow to ignore this type converter if the ddbstream JARs are not present on the classpath
+@Converter(generateLoader = true, ignoreOnLoadError = true)
+public final class StringSequenceNumberConverter {
+
+    private StringSequenceNumberConverter() {
+    }
+
+    @Converter
+    public static SequenceNumberProvider toSequenceNumberProvider(String sequenceNumber) {
+        return new StaticSequenceNumberProvider(sequenceNumber);
+    }
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/AmazonDDBClientMock.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/AmazonDDBClientMock.java
new file mode 100644
index 0000000..a406c2e
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/AmazonDDBClientMock.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BatchGetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchGetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.ConsumedCapacity;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeysAndAttributes;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputDescription;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
+import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
+import software.amazon.awssdk.services.dynamodb.model.TableDescription;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.UpdateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateTableResponse;
+
+public class AmazonDDBClientMock implements DynamoDbClient {
+    public static final long NOW = 1327709390233L;
+    DescribeTableRequest describeTableRequest;
+    CreateTableRequest createTableRequest;
+    UpdateTableRequest updateTableRequest;
+    DeleteTableRequest deleteTableRequest;
+    PutItemRequest putItemRequest;
+    UpdateItemRequest updateItemRequest;
+    DeleteItemRequest deleteItemRequest;
+    GetItemRequest getItemRequest;
+    BatchGetItemRequest batchGetItemRequest;
+    ScanRequest scanRequest;
+    QueryRequest queryRequest;
+
+    public AmazonDDBClientMock() {
+    }
+
+    @Override
+    public DescribeTableResponse describeTable(DescribeTableRequest describeTableRequest) {
+        this.describeTableRequest = describeTableRequest;
+        String tableName = describeTableRequest.tableName();
+        if ("activeTable".equals(tableName)) {
+            return tableWithStatus(TableStatus.ACTIVE);
+        } else if ("creatibleTable".equals(tableName) && createTableRequest != null) {
+            return tableWithStatus(TableStatus.ACTIVE);
+        } else if ("FULL_DESCRIBE_TABLE".equals(tableName)) {
+            return DescribeTableResponse.builder().table(TableDescription.builder()
+                    .tableName(tableName)
+                    .tableStatus(TableStatus.ACTIVE)
+                    .creationDateTime(Instant.now())
+                    .itemCount(100L)
+                    .keySchema(KeySchemaElement.builder().attributeName("name").build())
+                    .provisionedThroughput(ProvisionedThroughputDescription.builder()
+                            .readCapacityUnits(20L)
+                            .writeCapacityUnits(10L).build())
+                    .tableSizeBytes(1000L).build()).build();
+        }
+        AwsServiceException.Builder builder = AwsServiceException.builder();
+        AwsErrorDetails.Builder builderError = AwsErrorDetails.builder();
+        builderError.errorMessage("Resource not found");
+        builder.awsErrorDetails(builderError.build());
+        AwsServiceException ase = builder.build();
+        throw ase;
+    }
+
+    private DescribeTableResponse tableWithStatus(TableStatus active) {
+        return DescribeTableResponse.builder().table(TableDescription.builder().tableStatus(active).build()).build();
+    }
+
+    @Override
+    public CreateTableResponse createTable(CreateTableRequest createTableRequest) {
+        this.createTableRequest = createTableRequest;
+        return CreateTableResponse.builder().tableDescription(
+                TableDescription.builder().tableStatus(TableStatus.CREATING).build()).build();
+    }
+
+    @Override
+    public UpdateTableResponse updateTable(UpdateTableRequest updateTableRequest) {
+        this.updateTableRequest = updateTableRequest;
+        return null;
+    }
+
+    @Override
+    public DeleteTableResponse deleteTable(DeleteTableRequest deleteTableRequest) {
+        this.deleteTableRequest = deleteTableRequest;
+        return DeleteTableResponse.builder().tableDescription(TableDescription.builder()
+                .provisionedThroughput(ProvisionedThroughputDescription.builder().build())
+                .tableName(deleteTableRequest.tableName())
+                .creationDateTime(Instant.now())
+                .itemCount(10L)
+                .keySchema(new ArrayList<KeySchemaElement>())
+                .tableSizeBytes(20L)
+                .tableStatus(TableStatus.ACTIVE).build()).build();
+    }
+
+    @Override
+    public PutItemResponse putItem(PutItemRequest putItemRequest) {
+        this.putItemRequest = putItemRequest;
+        return PutItemResponse.builder().attributes(getAttributes()).build();
+    }
+
+    private Map<String, AttributeValue> getAttributes() {
+        Map<String, AttributeValue> attributes = new HashMap<>();
+        attributes.put("attrName", AttributeValue.builder().s("attrValue").build());
+        return attributes;
+    }
+
+    @Override
+    public UpdateItemResponse updateItem(UpdateItemRequest updateItemRequest) {
+        this.updateItemRequest = updateItemRequest;
+        return UpdateItemResponse.builder().attributes(getAttributes()).build();
+    }
+
+    @Override
+    public DeleteItemResponse deleteItem(DeleteItemRequest deleteItemRequest) {
+        this.deleteItemRequest = deleteItemRequest;
+        return DeleteItemResponse.builder().attributes(getAttributes()).build();
+    }
+
+    @Override
+    public GetItemResponse getItem(GetItemRequest getItemRequest) {
+        this.getItemRequest = getItemRequest;
+        return GetItemResponse.builder().item(getAttributes()).build();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public BatchGetItemResponse batchGetItem(BatchGetItemRequest batchGetItemRequest) {
+        this.batchGetItemRequest = batchGetItemRequest;
+        Map<String, List<Map<String, AttributeValue>>> responseMap = new HashMap<>();
+        List<Map<String, AttributeValue>> p = new ArrayList<>();
+        p.add(getAttributes());
+        responseMap.put("DOMAIN1", p);
+        Map<String, AttributeValue> keysMap = new HashMap<>();
+        keysMap.put("1", AttributeValue.builder().s("UNPROCESSED_KEY").build());
+        Map<String, KeysAndAttributes> unprocessedKeys = new HashMap<>();
+        unprocessedKeys.put("DOMAIN1", KeysAndAttributes.builder().keys(keysMap).build());
+
+        return BatchGetItemResponse.builder()
+                .responses(responseMap)
+                .unprocessedKeys(unprocessedKeys).build();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public ScanResponse scan(ScanRequest scanRequest) {
+        this.scanRequest = scanRequest;
+        ConsumedCapacity.Builder consumed = ConsumedCapacity.builder();
+        consumed.capacityUnits(1.0);
+        Map<String, AttributeValue> lastEvaluatedKey = new HashMap<>();
+        lastEvaluatedKey.put("1", AttributeValue.builder().s("LAST_KEY").build());
+        return ScanResponse.builder()
+                .consumedCapacity(consumed.build())
+                .count(1)
+                .items(getAttributes())
+                .scannedCount(10)
+                .lastEvaluatedKey(lastEvaluatedKey).build();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public QueryResponse query(QueryRequest queryRequest) {
+        this.queryRequest = queryRequest;
+        ConsumedCapacity.Builder consumed = ConsumedCapacity.builder();
+        consumed.capacityUnits(1.0);
+        Map<String, AttributeValue> lastEvaluatedKey = new HashMap<>();
+        lastEvaluatedKey.put("1", AttributeValue.builder().s("LAST_KEY").build());
+        return QueryResponse.builder()
+                .consumedCapacity(consumed.build())
+                .count(1)
+                .items(getAttributes())
+                .lastEvaluatedKey(lastEvaluatedKey).build();
+    }
+
+	@Override
+	public String serviceName() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public void close() {
+		// TODO Auto-generated method stub
+		
+	}
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/BatchGetItemsCommandTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/BatchGetItemsCommandTest.java
new file mode 100644
index 0000000..d351043
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/BatchGetItemsCommandTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.ddb.BatchGetItemsCommand;
+import org.apache.camel.component.aws2.ddb.Ddb2Configuration;
+import org.apache.camel.component.aws2.ddb.Ddb2Constants;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.Before;
+import org.junit.Test;
+
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.KeysAndAttributes;
+
+import static org.junit.Assert.assertEquals;
+
+public class BatchGetItemsCommandTest {
+
+    private BatchGetItemsCommand command;
+    private AmazonDDBClientMock ddbClient;
+    private Ddb2Configuration configuration;
+    private Exchange exchange;
+
+    @Before
+    public void setUp() {
+        ddbClient = new AmazonDDBClientMock();
+        configuration = new Ddb2Configuration();
+        exchange = new DefaultExchange(new DefaultCamelContext());
+        command = new BatchGetItemsCommand(ddbClient, configuration, exchange);
+    }
+
+    @Test
+    public void execute() {
+        Map<String, AttributeValue> key = new HashMap<>();
+        key.put("1", AttributeValue.builder().s("Key_1").build());
+        Map<String, AttributeValue> unprocessedKey = new HashMap<>();
+        unprocessedKey.put("1", AttributeValue.builder().s("UNPROCESSED_KEY").build());
+        Map<String, KeysAndAttributes> keysAndAttributesMap = new HashMap<>();
+        KeysAndAttributes keysAndAttributes = KeysAndAttributes.builder().keys(key).build();
+        keysAndAttributesMap.put("DOMAIN1", keysAndAttributes);
+        exchange.getIn().setHeader(Ddb2Constants.BATCH_ITEMS, keysAndAttributesMap);
+
+        command.execute();
+
+        assertEquals(keysAndAttributesMap, ddbClient.batchGetItemRequest.requestItems());
+
+
+        List<Map<String, AttributeValue>> batchResponse = (List<Map<String, AttributeValue>>)exchange.getIn().getHeader(Ddb2Constants.BATCH_RESPONSE, Map.class).get("DOMAIN1");
+        AttributeValue value = batchResponse.get(0).get("attrName");
+
+        KeysAndAttributes unProcessedAttributes = (KeysAndAttributes)exchange.getIn().getHeader(
+                Ddb2Constants.UNPROCESSED_KEYS, Map.class).get("DOMAIN1");
+        Map<String, AttributeValue> next = unProcessedAttributes.keys().iterator().next();
+
+        assertEquals(AttributeValue.builder().s("attrValue").build(), value);
+        assertEquals(unprocessedKey, next);
+    }
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DdbComponentConfigurationTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DdbComponentConfigurationTest.java
new file mode 100644
index 0000000..87ffa5d
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DdbComponentConfigurationTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import org.apache.camel.component.aws2.ddb.Ddb2Component;
+import org.apache.camel.component.aws2.ddb.Ddb2Endpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import software.amazon.awssdk.core.Protocol;
+import software.amazon.awssdk.regions.Region;
+
+public class DdbComponentConfigurationTest extends CamelTestSupport {
+    
+    @Test
+    public void createEndpointWithComponentElements() throws Exception {
+        Ddb2Component component = context.getComponent("aws2-ddb", Ddb2Component.class);
+        component.setAccessKey("XXX");
+        component.setSecretKey("YYY");
+        Ddb2Endpoint endpoint = (Ddb2Endpoint)component.createEndpoint("aws2-ddb://myTable");
+        
+        assertEquals("myTable", endpoint.getConfiguration().getTableName());
+        assertEquals("XXX", endpoint.getConfiguration().getAccessKey());
+        assertEquals("YYY", endpoint.getConfiguration().getSecretKey());
+    }
+    
+    @Test
+    public void createEndpointWithComponentAndEndpointElements() throws Exception {
+        Ddb2Component component = context.getComponent("aws2-ddb", Ddb2Component.class);
+        component.setAccessKey("XXX");
+        component.setSecretKey("YYY");
+        component.setRegion(Region.US_WEST_1.toString());
+        Ddb2Endpoint endpoint = (Ddb2Endpoint)component.createEndpoint("aws2-ddb://myTable?accessKey=xxxxxx&secretKey=yyyyy&region=US_EAST_1");
+        
+        assertEquals("myTable", endpoint.getConfiguration().getTableName());
+        assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
+        assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
+        assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
+    }
+    
+    @Test
+    public void createEndpointWithComponentEndpointElementsAndProxy() throws Exception {
+        Ddb2Component component = context.getComponent("aws2-ddb", Ddb2Component.class);
+        component.setAccessKey("XXX");
+        component.setSecretKey("YYY");
+        component.setRegion(Region.US_WEST_1.toString());
+        Ddb2Endpoint endpoint = (Ddb2Endpoint)component.createEndpoint("aws2-ddb://myTable?accessKey=xxxxxx&secretKey=yyyyy&region=US_EAST_1&proxyHost=localhost&proxyPort=9000&proxyProtocol=HTTP");
+        
+        assertEquals("myTable", endpoint.getConfiguration().getTableName());
+        assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
+        assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
+        assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
+        assertEquals(Protocol.HTTP, endpoint.getConfiguration().getProxyProtocol());
+    }
+    
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DdbComponentRegistryClientTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DdbComponentRegistryClientTest.java
new file mode 100644
index 0000000..3600fa2
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DdbComponentRegistryClientTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import org.apache.camel.component.aws2.ddb.Ddb2Component;
+import org.apache.camel.component.aws2.ddb.Ddb2Endpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class DdbComponentRegistryClientTest extends CamelTestSupport {
+    
+    @Test
+    public void createEndpointWithRegistryClient() throws Exception {
+        AmazonDDBClientMock ddbClient = new AmazonDDBClientMock();
+        context.getRegistry().bind("ddbClient", ddbClient);
+        Ddb2Component component = context.getComponent("aws2-ddb", Ddb2Component.class);
+        Ddb2Endpoint endpoint = (Ddb2Endpoint)component.createEndpoint("aws2-ddb://myTable");
+        
+        assertEquals("myTable", endpoint.getConfiguration().getTableName());
+    }
+    
+    @Test(expected = IllegalArgumentException.class)
+    public void createEndpointWithoutRegistryClient() throws Exception {
+        Ddb2Component component = context.getComponent("aws2-ddb", Ddb2Component.class);
+        Ddb2Endpoint endpoint = (Ddb2Endpoint)component.createEndpoint("aws2-ddb://myTable");
+        
+        assertEquals("myTable", endpoint.getConfiguration().getTableName());
+    }
+}
\ No newline at end of file
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DdbComponentTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DdbComponentTest.java
new file mode 100644
index 0000000..151e7c6
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DdbComponentTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws2.ddb.Ddb2Component;
+import org.apache.camel.impl.engine.DefaultProducerTemplate;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class DdbComponentTest extends CamelTestSupport {
+
+    @BindToRegistry("amazonDDBClient")
+    private AmazonDDBClientMock amazonDDBClient = new AmazonDDBClientMock();
+
+    @Test
+    public void whenTableExistsThenDoesntCreateItOnStart() throws Exception {
+        assertNull(amazonDDBClient.createTableRequest);
+    }
+
+
+    @Test
+    public void whenTableIsMissingThenCreateItOnStart() throws Exception {
+        DefaultProducerTemplate.newInstance(context,
+                "aws2-ddb://creatibleTable?amazonDDBClient=#amazonDDBClient");
+        assertEquals("creatibleTable", amazonDDBClient.createTableRequest.tableName());
+    }
+    
+    @Test
+    public void createEndpointWithOnlySecretKeyConfiguration() throws Exception {
+        Ddb2Component component = context.getComponent("aws2-ddb", Ddb2Component.class);
+        component.createEndpoint("aws2-ddb://activeTable?secretKey=xxx");
+    }
+    
+    @Test
+    public void createEndpointWithoutSecretKeyAndAccessKeyConfiguration() throws Exception {
+        Ddb2Component component = context.getComponent("aws2-ddb", Ddb2Component.class);
+        component.createEndpoint("aws2-ddb://activeTable?amazonDDBClient=#amazonDDBClient");
+    }
+
+
+    @Test
+    public void createEndpointWithOnlyAccessKeyAndSecretKey() throws Exception {
+        Ddb2Component component = context.getComponent("aws2-ddb", Ddb2Component.class);
+        component.createEndpoint("aws2-ddb://activeTable?accessKey=xxx&secretKey=yyy");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .to("aws2-ddb://activeTable?amazonDDBClient=#amazonDDBClient");
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DdbComponentVerifierExtensionTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DdbComponentVerifierExtensionTest.java
new file mode 100644
index 0000000..d07a24e
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DdbComponentVerifierExtensionTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Component;
+import org.apache.camel.component.extension.ComponentVerifierExtension;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DdbComponentVerifierExtensionTest extends CamelTestSupport {
+
+    // *************************************************
+    // Tests (parameters)
+    // *************************************************
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Test
+    public void testParameters() throws Exception {
+        Component component = context().getComponent("aws2-ddb");
+
+        ComponentVerifierExtension verifier = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("secretKey", "l");
+        parameters.put("accessKey", "k");
+        parameters.put("region", "l");
+        parameters.put("tableName", "test");
+
+        ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.PARAMETERS, parameters);
+
+        Assert.assertEquals(ComponentVerifierExtension.Result.Status.OK, result.getStatus());
+    }
+
+    @Test
+    public void testConnectivity() throws Exception {
+        Component component = context().getComponent("aws2-ddb");
+        ComponentVerifierExtension verifier = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("secretKey", "l");
+        parameters.put("accessKey", "k");
+        parameters.put("region", "US_EAST_1");
+        parameters.put("tableName", "test");
+
+        ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+
+        Assert.assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+    }
+
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DeleteItemCommandTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DeleteItemCommandTest.java
new file mode 100644
index 0000000..57b54d6
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DeleteItemCommandTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.ddb.Ddb2Configuration;
+import org.apache.camel.component.aws2.ddb.Ddb2Constants;
+import org.apache.camel.component.aws2.ddb.DeleteItemCommand;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.Before;
+import org.junit.Test;
+
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
+
+import static org.junit.Assert.assertEquals;
+
+public class DeleteItemCommandTest {
+
+    private DeleteItemCommand command;
+    private AmazonDDBClientMock ddbClient;
+    private Ddb2Configuration configuration;
+    private Exchange exchange;
+
+    @Before
+    public void setUp() {
+        ddbClient = new AmazonDDBClientMock();
+        configuration = new Ddb2Configuration();
+        configuration.setTableName("DOMAIN1");
+        exchange = new DefaultExchange(new DefaultCamelContext());
+        command = new DeleteItemCommand(ddbClient, configuration, exchange);
+    }
+
+    @Test
+    public void execute() {
+        Map<String, AttributeValue> key = new HashMap<>();
+        key.put("1", AttributeValue.builder().s("Key_1").build());
+        exchange.getIn().setHeader(Ddb2Constants.KEY, key);
+
+
+        Map<String, ExpectedAttributeValue> updateCondition = new HashMap<>();
+        updateCondition
+                .put("name", ExpectedAttributeValue.builder().attributeValueList(AttributeValue.builder().s("expected value").build()).build());
+        exchange.getIn().setHeader(Ddb2Constants.UPDATE_CONDITION, updateCondition);
+        exchange.getIn().setHeader(Ddb2Constants.RETURN_VALUES, "ALL_OLD");
+
+        command.execute();
+
+        assertEquals("DOMAIN1", ddbClient.deleteItemRequest.tableName());
+        assertEquals(key, ddbClient.deleteItemRequest.key());
+        assertEquals(updateCondition, ddbClient.deleteItemRequest.expected());
+        assertEquals(ReturnValue.ALL_OLD, ddbClient.deleteItemRequest.returnValues());
+        assertEquals(AttributeValue.builder().s("attrValue").build(),
+                exchange.getIn().getHeader(Ddb2Constants.ATTRIBUTES, Map.class).get(
+                        "attrName"));
+    }
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DeleteTableCommandTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DeleteTableCommandTest.java
new file mode 100644
index 0000000..2dbdb54
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DeleteTableCommandTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.ArrayList;
+import java.util.Date;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.ddb.Ddb2Configuration;
+import org.apache.camel.component.aws2.ddb.Ddb2Constants;
+import org.apache.camel.component.aws2.ddb.DeleteTableCommand;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.Before;
+import org.junit.Test;
+
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputDescription;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+
+import static org.junit.Assert.assertEquals;
+
+public class DeleteTableCommandTest {
+
+    private DeleteTableCommand command;
+    private AmazonDDBClientMock ddbClient;
+    private Ddb2Configuration configuration;
+    private Exchange exchange;
+    
+    @Before
+    public void setUp() {
+        ddbClient = new AmazonDDBClientMock();
+        configuration = new Ddb2Configuration();
+        configuration.setTableName("DOMAIN1");
+        exchange = new DefaultExchange(new DefaultCamelContext());
+        
+        command = new DeleteTableCommand(ddbClient, configuration, exchange);
+    }
+
+    @Test
+    public void testExecute() {
+        command.execute();
+        
+        assertEquals("DOMAIN1", ddbClient.deleteTableRequest.tableName());
+        assertEquals(ProvisionedThroughputDescription.builder().build(), exchange.getIn().getHeader(
+                Ddb2Constants.PROVISIONED_THROUGHPUT));
+        assertEquals(Long.valueOf(10L), exchange.getIn().getHeader(Ddb2Constants.ITEM_COUNT, Long.class));
+        assertEquals(Long.valueOf(20L), exchange.getIn().getHeader(Ddb2Constants.TABLE_SIZE, Long.class));
+        assertEquals(TableStatus.ACTIVE, exchange.getIn().getHeader(Ddb2Constants.TABLE_STATUS, TableStatus.class));
+    }
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DescribeTableCommandTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DescribeTableCommandTest.java
new file mode 100644
index 0000000..84b929f
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/DescribeTableCommandTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.ddb.Ddb2Configuration;
+import org.apache.camel.component.aws2.ddb.Ddb2Constants;
+import org.apache.camel.component.aws2.ddb.DescribeTableCommand;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.Before;
+import org.junit.Test;
+
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+
+import static org.junit.Assert.assertEquals;
+
+public class DescribeTableCommandTest {
+
+    private DescribeTableCommand command;
+    private AmazonDDBClientMock ddbClient;
+    private Ddb2Configuration configuration;
+    private Exchange exchange;
+
+    @Before
+    public void setUp() {
+        ddbClient = new AmazonDDBClientMock();
+        configuration = new Ddb2Configuration();
+        configuration.setTableName("FULL_DESCRIBE_TABLE");
+        exchange = new DefaultExchange(new DefaultCamelContext());
+        command = new DescribeTableCommand(ddbClient, configuration, exchange);
+    }
+
+    @Test
+    public void testExecute() {
+        command.execute();
+        List<KeySchemaElement> keySchema = new ArrayList<>();
+        keySchema.add(KeySchemaElement.builder().attributeName("name").build());
+        assertEquals("FULL_DESCRIBE_TABLE", ddbClient.describeTableRequest.tableName());
+        assertEquals("FULL_DESCRIBE_TABLE", exchange.getIn().getHeader(Ddb2Constants.TABLE_NAME));
+        assertEquals(TableStatus.ACTIVE, exchange.getIn().getHeader(Ddb2Constants.TABLE_STATUS));
+        
+        assertEquals(100L, exchange.getIn().getHeader(Ddb2Constants.ITEM_COUNT));
+        assertEquals(keySchema,
+                exchange.getIn().getHeader(Ddb2Constants.KEY_SCHEMA));
+        assertEquals(20L, exchange.getIn().getHeader(Ddb2Constants.READ_CAPACITY));
+        assertEquals(10L, exchange.getIn().getHeader(Ddb2Constants.WRITE_CAPACITY));
+        assertEquals(1000L, exchange.getIn().getHeader(Ddb2Constants.TABLE_SIZE));
+    }
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/GetItemCommandTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/GetItemCommandTest.java
new file mode 100644
index 0000000..95f56da
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/GetItemCommandTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.ddb.Ddb2Configuration;
+import org.apache.camel.component.aws2.ddb.Ddb2Constants;
+import org.apache.camel.component.aws2.ddb.GetItemCommand;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.Before;
+import org.junit.Test;
+
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import static org.junit.Assert.assertEquals;
+
+public class GetItemCommandTest {
+    private GetItemCommand command;
+    private AmazonDDBClientMock ddbClient;
+    private Ddb2Configuration configuration;
+    private Exchange exchange;
+
+    @Before
+    public void setUp() {
+        ddbClient = new AmazonDDBClientMock();
+        configuration = new Ddb2Configuration();
+        configuration.setTableName("DOMAIN1");
+        exchange = new DefaultExchange(new DefaultCamelContext());
+        command = new GetItemCommand(ddbClient, configuration, exchange);
+    }
+
+    @Test
+    public void execute() {
+        Map<String, AttributeValue> key = new HashMap<>();
+        key.put("1", AttributeValue.builder().s("Key_1").build());
+        exchange.getIn().setHeader(Ddb2Constants.KEY, key);
+
+        List<String> attrNames = Arrays.asList("attrName");
+        exchange.getIn().setHeader(Ddb2Constants.ATTRIBUTE_NAMES, attrNames);
+        exchange.getIn().setHeader(Ddb2Constants.CONSISTENT_READ, true);
+
+        command.execute();
+
+        assertEquals("DOMAIN1", ddbClient.getItemRequest.tableName());
+        assertEquals(attrNames, ddbClient.getItemRequest.attributesToGet());
+        assertEquals(true, ddbClient.getItemRequest.consistentRead());
+        assertEquals(key, ddbClient.getItemRequest.key());
+        assertEquals(AttributeValue.builder().s("attrValue").build(),
+                exchange.getIn().getHeader(Ddb2Constants.ATTRIBUTES, Map.class).get(
+                        "attrName"));
+    }
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/PutItemCommandTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/PutItemCommandTest.java
new file mode 100644
index 0000000..19c43f7
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/PutItemCommandTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.ddb.Ddb2Configuration;
+import org.apache.camel.component.aws2.ddb.Ddb2Constants;
+import org.apache.camel.component.aws2.ddb.PutItemCommand;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.Before;
+import org.junit.Test;
+
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
+
+import static org.junit.Assert.assertEquals;
+
+public class PutItemCommandTest {
+
+    private PutItemCommand command;
+    private AmazonDDBClientMock ddbClient;
+    private Ddb2Configuration configuration;
+    private Exchange exchange;
+
+    @Before
+    public void setUp() {
+        ddbClient = new AmazonDDBClientMock();
+        configuration = new Ddb2Configuration();
+        configuration.setTableName("DOMAIN1");
+        exchange = new DefaultExchange(new DefaultCamelContext());
+        command = new PutItemCommand(ddbClient, configuration, exchange);
+    }
+
+    @Test
+    public void execute() {
+        Map<String, AttributeValue> attributeMap = new HashMap<>();
+        AttributeValue attributeValue = AttributeValue.builder().s("test value").build();
+        attributeMap.put("name", attributeValue);
+        exchange.getIn().setHeader(Ddb2Constants.ITEM, attributeMap);
+
+        Map<String, ExpectedAttributeValue> expectedAttributeValueMap = new HashMap<>();
+        expectedAttributeValueMap.put("name", ExpectedAttributeValue.builder().attributeValueList(attributeValue).build());
+        exchange.getIn().setHeader(Ddb2Constants.UPDATE_CONDITION, expectedAttributeValueMap);
+
+        command.execute();
+
+        assertEquals("DOMAIN1", ddbClient.putItemRequest.tableName());
+        assertEquals(attributeMap, ddbClient.putItemRequest.item());
+        assertEquals(expectedAttributeValueMap, ddbClient.putItemRequest.expected());
+        assertEquals(AttributeValue.builder().s("attrValue").build(),
+                exchange.getIn().getHeader(Ddb2Constants.ATTRIBUTES, Map.class).get("attrName"));
+    }
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/QueryCommandTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/QueryCommandTest.java
new file mode 100644
index 0000000..880a4e0
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/QueryCommandTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.ddb.Ddb2Configuration;
+import org.apache.camel.component.aws2.ddb.Ddb2Constants;
+import org.apache.camel.component.aws2.ddb.QueryCommand;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.Before;
+import org.junit.Test;
+
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.ComparisonOperator;
+import software.amazon.awssdk.services.dynamodb.model.Condition;
+import software.amazon.awssdk.services.dynamodb.model.ConsumedCapacity;
+
+import static org.junit.Assert.assertEquals;
+
+public class QueryCommandTest {
+
+    private QueryCommand command;
+    private AmazonDDBClientMock ddbClient;
+    private Ddb2Configuration configuration;
+    private Exchange exchange;
+
+    @Before
+    public void setUp() {
+        ddbClient = new AmazonDDBClientMock();
+        configuration = new Ddb2Configuration();
+        configuration.setTableName("DOMAIN1");
+        exchange = new DefaultExchange(new DefaultCamelContext());
+        command = new QueryCommand(ddbClient, configuration, exchange);
+    }
+
+    @Test
+    public void execute() {
+
+        Map<String, AttributeValue> startKey = new HashMap<>();
+        startKey.put("1", AttributeValue.builder().s("startKey").build());
+
+        List<String> attributeNames = Arrays.asList("attrNameOne", "attrNameTwo");
+        exchange.getIn().setHeader(Ddb2Constants.ATTRIBUTE_NAMES, attributeNames);
+        exchange.getIn().setHeader(Ddb2Constants.CONSISTENT_READ, true);
+        exchange.getIn().setHeader(Ddb2Constants.START_KEY, startKey);
+        exchange.getIn().setHeader(Ddb2Constants.LIMIT, 10);
+        exchange.getIn().setHeader(Ddb2Constants.SCAN_INDEX_FORWARD, true);
+        
+        Map<String, Condition> keyConditions = new HashMap<>();
+        Condition.Builder condition = Condition.builder()
+            .comparisonOperator(ComparisonOperator.GT.toString())
+            .attributeValueList(AttributeValue.builder().n("1985").build());
+        
+        keyConditions.put("1", condition.build());
+        
+        exchange.getIn().setHeader(Ddb2Constants.KEY_CONDITIONS, keyConditions);
+
+        command.execute();
+
+        Map<String, AttributeValue> mapAssert = new HashMap<>();
+        mapAssert.put("1", AttributeValue.builder().s("LAST_KEY").build());
+        ConsumedCapacity consumed = (ConsumedCapacity) exchange.getIn().getHeader(Ddb2Constants.CONSUMED_CAPACITY);
+        assertEquals(Integer.valueOf(1), exchange.getIn().getHeader(Ddb2Constants.COUNT, Integer.class));
+        assertEquals(Double.valueOf(1.0), consumed.capacityUnits());
+        assertEquals(mapAssert, exchange.getIn().getHeader(Ddb2Constants.LAST_EVALUATED_KEY, Map.class));
+        assertEquals(keyConditions, exchange.getIn().getHeader(Ddb2Constants.KEY_CONDITIONS, Map.class));
+
+        Map<?, ?> items = (Map<?, ?>) exchange.getIn().getHeader(Ddb2Constants.ITEMS, List.class).get(0);
+        assertEquals(AttributeValue.builder().s("attrValue").build(), items.get("attrName"));
+    }
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/ScanCommandTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/ScanCommandTest.java
new file mode 100644
index 0000000..527cad5
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/ScanCommandTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.ddb.Ddb2Configuration;
+import org.apache.camel.component.aws2.ddb.Ddb2Constants;
+import org.apache.camel.component.aws2.ddb.ScanCommand;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.Before;
+import org.junit.Test;
+
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.ComparisonOperator;
+import software.amazon.awssdk.services.dynamodb.model.Condition;
+import software.amazon.awssdk.services.dynamodb.model.ConsumedCapacity;
+
+import static org.junit.Assert.assertEquals;
+
+public class ScanCommandTest {
+
+    private ScanCommand command;
+    private AmazonDDBClientMock ddbClient;
+    private Ddb2Configuration configuration;
+    private Exchange exchange;
+
+    @Before
+    public void setUp() {
+        ddbClient = new AmazonDDBClientMock();
+        configuration = new Ddb2Configuration();
+        configuration.setTableName("DOMAIN1");
+        exchange = new DefaultExchange(new DefaultCamelContext());
+        command = new ScanCommand(ddbClient, configuration, exchange);
+    }
+
+    @Test
+    public void execute() {
+        Map<String, Condition> scanFilter = new HashMap<>();
+        Condition.Builder condition = Condition.builder()
+                .comparisonOperator(ComparisonOperator.GT.toString())
+                .attributeValueList(AttributeValue.builder().n("1985").build());
+        scanFilter.put("year", condition.build());
+        exchange.getIn().setHeader(Ddb2Constants.SCAN_FILTER, scanFilter);
+
+        command.execute();
+
+        Map<String, AttributeValue> mapAssert = new HashMap<>();
+        mapAssert.put("1", AttributeValue.builder().s("LAST_KEY").build());
+
+        ConsumedCapacity consumed = (ConsumedCapacity) exchange.getIn().getHeader(Ddb2Constants.CONSUMED_CAPACITY);
+        assertEquals(scanFilter, ddbClient.scanRequest.scanFilter());
+        assertEquals(Integer.valueOf(10), exchange.getIn().getHeader(Ddb2Constants.SCANNED_COUNT, Integer.class));
+        assertEquals(Integer.valueOf(1), exchange.getIn().getHeader(Ddb2Constants.COUNT, Integer.class));
+        assertEquals(Double.valueOf(1.0), consumed.capacityUnits());
+        assertEquals(mapAssert, exchange.getIn().getHeader(Ddb2Constants.LAST_EVALUATED_KEY, Map.class));
+
+        Map<?, ?> items = (Map<?, ?>) exchange.getIn().getHeader(Ddb2Constants.ITEMS, List.class).get(0);
+        assertEquals(AttributeValue.builder().s("attrValue").build(), items.get("attrName"));
+    }
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/UpdateItemCommandTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/UpdateItemCommandTest.java
new file mode 100644
index 0000000..515c8fa
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/UpdateItemCommandTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.ddb.Ddb2Configuration;
+import org.apache.camel.component.aws2.ddb.Ddb2Constants;
+import org.apache.camel.component.aws2.ddb.UpdateItemCommand;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.Before;
+import org.junit.Test;
+
+import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
+import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
+
+import static org.junit.Assert.assertEquals;
+
+public class UpdateItemCommandTest {
+
+    private UpdateItemCommand command;
+    private AmazonDDBClientMock ddbClient;
+    private Ddb2Configuration configuration;
+    private Exchange exchange;
+
+    @Before
+    public void setUp() {
+        ddbClient = new AmazonDDBClientMock();
+        configuration = new Ddb2Configuration();
+        configuration.setTableName("DOMAIN1");
+        exchange = new DefaultExchange(new DefaultCamelContext());
+        command = new UpdateItemCommand(ddbClient, configuration, exchange);
+    }
+
+    @Test
+    public void execute() {
+        Map<String, AttributeValue> key = new HashMap<>();
+        key.put("1", AttributeValue.builder().s("Key_1").build());
+        exchange.getIn().setHeader(Ddb2Constants.KEY, key);
+
+        Map<String, AttributeValueUpdate> attributeMap = new HashMap<>();
+        AttributeValueUpdate attributeValue = AttributeValueUpdate.builder().value(
+                AttributeValue.builder().s("new value").build()).action(AttributeAction.ADD).build();
+        attributeMap.put("name", attributeValue);
+        exchange.getIn().setHeader(Ddb2Constants.UPDATE_VALUES, attributeMap);
+
+        Map<String, ExpectedAttributeValue> expectedAttributeValueMap = new HashMap<>();
+        expectedAttributeValueMap
+                .put("name", ExpectedAttributeValue.builder().attributeValueList(AttributeValue.builder().s("expected value").build()).build());
+        exchange.getIn().setHeader(Ddb2Constants.UPDATE_CONDITION, expectedAttributeValueMap);
+        exchange.getIn().setHeader(Ddb2Constants.RETURN_VALUES, "ALL_OLD");
+
+        command.execute();
+
+        assertEquals("DOMAIN1", ddbClient.updateItemRequest.tableName());
+        assertEquals(attributeMap, ddbClient.updateItemRequest.attributeUpdates());
+        assertEquals(key, ddbClient.updateItemRequest.key());
+        assertEquals(expectedAttributeValueMap, ddbClient.updateItemRequest.expected());
+        assertEquals(ReturnValue.ALL_OLD, ddbClient.updateItemRequest.returnValues());
+        assertEquals(AttributeValue.builder().s("attrValue").build(),
+                exchange.getIn().getHeader(Ddb2Constants.ATTRIBUTES, Map.class).get(
+                        "attrName"));
+    }
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/UpdateTableCommandTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/UpdateTableCommandTest.java
new file mode 100644
index 0000000..57da931
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/UpdateTableCommandTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.ddb.Ddb2Configuration;
+import org.apache.camel.component.aws2.ddb.UpdateTableCommand;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class UpdateTableCommandTest {
+
+    private UpdateTableCommand command;
+    private AmazonDDBClientMock ddbClient;
+    private Ddb2Configuration configuration;
+    private Exchange exchange;
+    
+    @Before
+    public void setUp() {
+        ddbClient = new AmazonDDBClientMock();
+        configuration = new Ddb2Configuration();
+        configuration.setTableName("DOMAIN1");
+        configuration.setReadCapacity(20L);
+        configuration.setWriteCapacity(30L);
+        exchange = new DefaultExchange(new DefaultCamelContext());
+        
+        command = new UpdateTableCommand(ddbClient, configuration, exchange);
+    }
+
+    @Test
+    public void testExecute() {
+        command.execute();
+        
+        assertEquals("DOMAIN1", ddbClient.updateTableRequest.tableName());
+        assertEquals(Long.valueOf(20), ddbClient.updateTableRequest.provisionedThroughput().readCapacityUnits());
+        assertEquals(Long.valueOf(30), ddbClient.updateTableRequest.provisionedThroughput().writeCapacityUnits());
+    }
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/integration/DdbComponentIntegrationTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/integration/DdbComponentIntegrationTest.java
new file mode 100644
index 0000000..61295dd
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/integration/DdbComponentIntegrationTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddb.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws2.ddb.Ddb2Constants;
+import org.apache.camel.component.aws2.ddb.Ddb2Operations;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
+
+@Ignore("Must be manually tested. Provide your own credentials below!")
+public class DdbComponentIntegrationTest extends CamelTestSupport {
+
+    @EndpointInject("direct:start")
+    private ProducerTemplate template;
+
+    //To replace with proper credentials:
+    private final String attributeName = "clave";
+    private final String tableName = "TestTable";
+    private final String secretKey = "-";
+    private final String accessKey = "-";
+    private final String region = Region.EU_WEST_2.id();
+    //End credentials replacement
+
+    private final String randomId = String.valueOf(System.currentTimeMillis());
+
+    @Test
+    public void fullLifeCycle() {
+        putItem();
+        getItem();
+        updateItem();
+        deleteItem();
+    }
+
+    public void putItem() {
+        final Map<String, AttributeValue> attributeMap = new HashMap<>();
+        AttributeValue attributeValue = AttributeValue.builder().n(randomId).build();
+        attributeMap.put(attributeName, attributeValue);
+        attributeMap.put("secondary_attribute", AttributeValue.builder().s("value").build());
+
+        Exchange exchange = template.send("direct:start", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(Ddb2Constants.OPERATION, Ddb2Operations.PutItem);
+                exchange.getIn().setHeader(Ddb2Constants.CONSISTENT_READ, "true");
+                exchange.getIn().setHeader(Ddb2Constants.RETURN_VALUES, "ALL_OLD");
+                exchange.getIn().setHeader(Ddb2Constants.ITEM, attributeMap);
+                exchange.getIn().setHeader(Ddb2Constants.ATTRIBUTE_NAMES, attributeMap.keySet());
+            }
+        });
+
+        assertNotNull(exchange.getIn().getHeader(Ddb2Constants.ITEM));
+    }
+
+
+    public void updateItem() {
+        Map<String, AttributeValue> attributeMap = new HashMap<>();
+        attributeMap.put(attributeName, AttributeValue.builder().n(randomId).build());
+        attributeMap.put("secondary_attribute", AttributeValue.builder().s("new").build());
+
+        Map<String, ExpectedAttributeValue> expectedAttributeValueMap = new HashMap<>();
+        expectedAttributeValueMap.put(attributeName,
+                ExpectedAttributeValue.builder().attributeValueList(AttributeValue.builder().n(randomId).build()).build());
+
+        Exchange exchange = template.send("direct:start", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(Ddb2Constants.ITEM, attributeMap);
+                exchange.getIn().setHeader(Ddb2Constants.UPDATE_CONDITION, expectedAttributeValueMap);
+                exchange.getIn().setHeader(Ddb2Constants.ATTRIBUTE_NAMES, attributeMap.keySet());
+                exchange.getIn().setHeader(Ddb2Constants.RETURN_VALUES, "ALL_OLD");
+            }
+        });
+
+        assertNotNull(exchange.getIn().getHeader(Ddb2Constants.ATTRIBUTES));
+    }
+
+    public void getItem() {
+
+        Map<String, AttributeValue> key = new HashMap<>();
+        key.put(attributeName, AttributeValue.builder().n(randomId).build());
+
+        Exchange exchange = template.send("direct:start", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(Ddb2Constants.OPERATION, Ddb2Operations.GetItem);
+                exchange.getIn().setHeader(Ddb2Constants.CONSISTENT_READ, true);
+                exchange.getIn().setHeader(Ddb2Constants.KEY, key);
+                exchange.getIn().setHeader(Ddb2Constants.ATTRIBUTE_NAMES, key.keySet());
+            }
+        });
+
+        assertNotNull(exchange.getIn().getHeader(Ddb2Constants.ATTRIBUTES));
+        assertEquals(AttributeValue.builder().n(randomId).build(),
+                exchange.getIn().getHeader(Ddb2Constants.ATTRIBUTES, Map.class).get(
+                        attributeName));
+    }
+
+    @Test
+    public void deleteItem() {
+        Map<String, AttributeValue> key = new HashMap<>();
+        key.put(attributeName, AttributeValue.builder().n(randomId).build());
+
+        Exchange exchange = template.send("direct:start", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(Ddb2Constants.KEY, key);
+                exchange.getIn().setHeader(Ddb2Constants.RETURN_VALUES, "ALL_OLD");
+                exchange.getIn().setHeader(Ddb2Constants.OPERATION, Ddb2Operations.DeleteItem);
+                exchange.getIn().setHeader(Ddb2Constants.ATTRIBUTE_NAMES, key.keySet());
+            }
+        });
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .to("aws-ddb://" + tableName + "?"
+                                + "region=" + region
+                                + "&accessKey=" + accessKey
+                                + "&secretKey=RAW(" + secretKey + ")");
+            }
+        };
+    }
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/DdbStreamComponentConfigurationTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/DdbStreamComponentConfigurationTest.java
new file mode 100644
index 0000000..508ec32
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/DdbStreamComponentConfigurationTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import org.apache.camel.component.aws2.ddbstream.Ddb2StreamComponent;
+import org.apache.camel.component.aws2.ddbstream.Ddb2StreamEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import software.amazon.awssdk.core.Protocol;
+import software.amazon.awssdk.regions.Region;
+
+public class DdbStreamComponentConfigurationTest extends CamelTestSupport {
+    
+    @Test
+    public void createEndpointWithAccessAndSecretKey() throws Exception {
+        Ddb2StreamComponent component = context.getComponent("aws2-ddbstream", Ddb2StreamComponent.class);
+        Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint)component.createEndpoint("aws2-ddbstreams://myTable?accessKey=xxxxx&secretKey=yyyyy");
+        
+        assertEquals("myTable", endpoint.getConfiguration().getTableName());
+        assertEquals("xxxxx", endpoint.getConfiguration().getAccessKey());
+        assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());    
+    }
+    
+    @Test
+    public void createEndpointWithComponentElements() throws Exception {
+        Ddb2StreamComponent component = context.getComponent("aws2-ddbstream", Ddb2StreamComponent.class);
+        component.setAccessKey("XXX");
+        component.setSecretKey("YYY");
+        Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint)component.createEndpoint("aws2-ddbstreams://myTable");
+        
+        assertEquals("myTable", endpoint.getConfiguration().getTableName());
+        assertEquals("XXX", endpoint.getConfiguration().getAccessKey());
+        assertEquals("YYY", endpoint.getConfiguration().getSecretKey());
+    }
+    
+    @Test
+    public void createEndpointWithComponentAndEndpointElements() throws Exception {
+        Ddb2StreamComponent component = context.getComponent("aws2-ddbstream", Ddb2StreamComponent.class);
+        component.setAccessKey("XXX");
+        component.setSecretKey("YYY");
+        component.setRegion(Region.US_WEST_1.toString());
+        Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint)component.createEndpoint("aws2-ddbstreams://myTable?accessKey=xxxxxx&secretKey=yyyyy&region=US_EAST_1");
+        
+        assertEquals("myTable", endpoint.getConfiguration().getTableName());
+        assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
+        assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
+        assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
+    }
+    
+    @Test
+    public void createEndpointWithComponentEndpointElementsAndProxy() throws Exception {
+        Ddb2StreamComponent component = context.getComponent("aws2-ddbstream", Ddb2StreamComponent.class);
+        component.setAccessKey("XXX");
+        component.setSecretKey("YYY");
+        component.setRegion(Region.US_WEST_1.toString());
+        Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint)component.createEndpoint("aws2-ddbstreams://myTable?accessKey=xxxxxx&secretKey=yyyyy&region=US_EAST_1&proxyHost=localhost&proxyPort=9000&proxyProtocol=HTTP");
+        
+        assertEquals("myTable", endpoint.getConfiguration().getTableName());
+        assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
+        assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
+        assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
+        assertEquals(Protocol.HTTP, endpoint.getConfiguration().getProxyProtocol());
+    }
+    
+}
diff --git a/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/DdbStreamComponentVerifierExtensionTest.java b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/DdbStreamComponentVerifierExtensionTest.java
new file mode 100644
index 0000000..2936b8d
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/DdbStreamComponentVerifierExtensionTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Component;
+import org.apache.camel.component.extension.ComponentVerifierExtension;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DdbStreamComponentVerifierExtensionTest extends CamelTestSupport {
+
+    // *************************************************
+    // Tests (parameters)
+    // *************************************************
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Test
+    public void testParameters() throws Exception {
+        Component component = context().getComponent("aws2-ddbstream");
+
+        ComponentVerifierExtension verifier = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("secretKey", "l");
+        parameters.put("accessKey", "k");
+        parameters.put("region", "l");
+        parameters.put("tableName", "test");
+
+        ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.PARAMETERS, parameters);
+
+        Assert.assertEquals(ComponentVerifierExtension.Result.Status.OK, result.getStatus());
+    }
+
+    @Test
+    public void testConnectivity() throws Exception {
+        Component component = context().getComponent("aws2-ddbstream");
+        ComponentVerifierExtension verifier = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("secretKey", "l");
+        parameters.put("accessKey", "k");
+        parameters.put("region", "US_EAST_1");
+        parameters.put("tableName", "test");
+
+        ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+
+        Assert.assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+    }
+
+}
diff --git a/components/camel-aws2-ddb/src/test/resources/log4j2.properties b/components/camel-aws2-ddb/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..040d9e2
--- /dev/null
+++ b/components/camel-aws2-ddb/src/test/resources/log4j2.properties
@@ -0,0 +1,28 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.file.type = File
+appender.file.name = file
+appender.file.fileName = target/camel-aws-ddb-test.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+appender.out.type = Console
+appender.out.name = out
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = file