You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/05/19 15:17:03 UTC

[pulsar] branch master updated: Add dynamodb streams source (#6874)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new deea61e  Add dynamodb streams source (#6874)
deea61e is described below

commit deea61ed8374af62686ed6ff9d8fa50e3eac8172
Author: Jacob Burroughs <ma...@gmail.com>
AuthorDate: Tue May 19 10:16:51 2020 -0500

    Add dynamodb streams source (#6874)
    
    ### Motivation
    
    The goal is to allow consuming dynamodb streams directly into pulsar
    
    ### Modifications
    
    I created a new source for dynamo, which shares less code than ideal with the kinesis source, since the dynamodb kinesis client adapter supports KCL v1.x only, while, the kinesis source is using KCL v2.x.  I also abstracted the aws credential management pieces into their own package.
    
    ### Verifying this change
    
    Create a dynamodb table with streams enabled.  Configure this connector with the stream ARN and appropriate credentials.  Create/update an entry in the table and ensure it is written to pulsar by the connector.
---
 distribution/io/src/assemble/io.xml                |   1 +
 pom.xml                                            |   2 +-
 pulsar-io/{ => aws}/pom.xml                        |  63 +++++----
 .../pulsar/io/aws/AbstractAwsConnector.java}       |  27 ++--
 .../io/aws}/AwsCredentialProviderPlugin.java       |   2 +-
 .../io/aws}/AwsDefaultProviderChainPlugin.java     |   2 +-
 .../io/aws}/STSAssumeRoleProviderPlugin.java       |   2 +-
 pulsar-io/{kinesis => dynamodb}/pom.xml            |  43 ++----
 .../apache/pulsar/io/dynamodb/DynamoDBSource.java  | 135 +++++++++++++++++++
 .../pulsar/io/dynamodb/DynamoDBSourceConfig.java}  | 125 +++++++++++-------
 .../apache/pulsar/io/dynamodb/StreamsRecord.java   |  83 ++++++++++++
 .../pulsar/io/dynamodb/StreamsRecordProcessor.java | 114 ++++++++++++++++
 .../dynamodb/StreamsRecordProcessorFactory.java}   |  33 ++---
 .../resources/META-INF/services/pulsar-io.yaml     |  22 ++++
 .../io/dynamodb/DynamoDBSourceConfigTests.java     | 146 +++++++++++++++++++++
 .../dynamodb/src/test/resources/sourceConfig.yaml  |  32 +++++
 pulsar-io/kinesis/pom.xml                          |  18 +--
 .../io/kinesis/AwsCredentialProviderPlugin.java    |  56 +-------
 .../io/kinesis/AwsDefaultProviderChainPlugin.java  |  34 ++---
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  |   9 +-
 .../apache/pulsar/io/kinesis/KinesisSource.java    |   4 +-
 .../pulsar/io/kinesis/KinesisSourceConfig.java     |   1 +
 .../io/kinesis/STSAssumeRoleProviderPlugin.java    |  52 ++------
 .../apache/pulsar/io/kinesis/KinesisSinkTest.java  |   1 +
 pulsar-io/pom.xml                                  |   2 +
 site2/docs/io-connectors.md                        |   5 +
 ...{io-kinesis-source.md => io-dynamodb-source.md} |  33 +++--
 site2/docs/io-dynamodb.md                          |   5 +
 site2/docs/io-kinesis-sink.md                      |   6 +-
 site2/docs/io-kinesis-source.md                    |   2 +-
 site2/website/data/connectors.js                   |   6 +
 31 files changed, 761 insertions(+), 305 deletions(-)

diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index a35fb98..3598786 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -74,5 +74,6 @@
     <file><source>${basedir}/../../pulsar-io/redis/target/pulsar-io-redis-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/flume/target/pulsar-io-flume-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/solr/target/pulsar-io-solr-${project.version}.nar</source></file>
+    <file><source>${basedir}/../../pulsar-io/dynamodb/target/pulsar-io-dynamodb-${project.version}.nar</source></file>
   </files>
 </assembly>
diff --git a/pom.xml b/pom.xml
index f61432c..a1080c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -189,7 +189,7 @@ flexible messaging model and an intuitive client API.</description>
     <aerospike-client.version>4.4.8</aerospike-client.version>
     <kafka-client.version>2.3.0</kafka-client.version>
     <rabbitmq-client.version>5.1.1</rabbitmq-client.version>
-    <aws-sdk.version>1.11.297</aws-sdk.version>
+    <aws-sdk.version>1.11.774</aws-sdk.version>
     <avro.version>1.9.1</avro.version>
     <joda.version>2.10.1</joda.version>
     <jclouds.version>2.2.0</jclouds.version>
diff --git a/pulsar-io/pom.xml b/pulsar-io/aws/pom.xml
similarity index 51%
copy from pulsar-io/pom.xml
copy to pulsar-io/aws/pom.xml
index c1f7ff4..95513e5 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/aws/pom.xml
@@ -19,44 +19,41 @@
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <packaging>pom</packaging>
   <parent>
     <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar</artifactId>
+    <artifactId>pulsar-io</artifactId>
     <version>2.6.0-SNAPSHOT</version>
   </parent>
 
-  <artifactId>pulsar-io</artifactId>
-  <name>Pulsar IO :: Parent</name>
-
-  <modules>
-    <module>core</module>
-    <module>common</module>
-    <module>docs</module>
-    <module>twitter</module>
-    <module>cassandra</module>
-    <module>aerospike</module>
-    <module>kafka</module>
-    <module>rabbitmq</module>
-    <module>kinesis</module>
-    <module>hdfs3</module>
-    <module>jdbc</module>
-    <module>data-generator</module>
-    <module>elastic-search</module>
-    <module>kafka-connect-adaptor</module>
-    <module>debezium</module>
-    <module>hdfs2</module>
-    <module>canal</module>
-    <module>file</module>
-    <module>netty</module>
-    <module>hbase</module>
-    <module>mongo</module>
-    <module>flume</module>
-    <module>redis</module>
-    <module>solr</module>
-    <module>influxdb</module>
-  </modules>
+  <artifactId>pulsar-io-aws</artifactId>
+  <name>Pulsar IO :: IO AWS</name>
 
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+
+    <!-- aws dependencies -->
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-sts</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>sts</artifactId>
+      <version>2.10.56</version>
+    </dependency>
+	<!-- /aws dependencies -->
+
+  </dependencies>
 </project>
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AbstractKinesisConnector.java b/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AbstractAwsConnector.java
similarity index 83%
rename from pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AbstractKinesisConnector.java
rename to pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AbstractAwsConnector.java
index dc71e90..f8b84b0 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AbstractKinesisConnector.java
+++ b/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AbstractAwsConnector.java
@@ -16,10 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.kinesis;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
+package org.apache.pulsar.io.aws;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -32,16 +29,17 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 
 import lombok.extern.slf4j.Slf4j;
+import software.amazon.awssdk.utils.StringUtils;
 
 @Slf4j
-public abstract class AbstractKinesisConnector {
+public abstract class AbstractAwsConnector {
     
     public static final String ACCESS_KEY_NAME = "accessKey";
     public static final String SECRET_KEY_NAME = "secretKey";
 
-    protected AwsCredentialProviderPlugin createCredentialProvider(String awsCredentialPluginName,
-            String awsCredentialPluginParam) {
-        if (isNotBlank(awsCredentialPluginName)) {
+    public AwsCredentialProviderPlugin createCredentialProvider(String awsCredentialPluginName,
+                                                                   String awsCredentialPluginParam) {
+        if (StringUtils.isNotBlank(awsCredentialPluginName)) {
             return createCredentialProviderWithPlugin(awsCredentialPluginName, awsCredentialPluginParam);
         } else {
             return defaultCredentialProvider(awsCredentialPluginParam);
@@ -79,16 +77,19 @@ public abstract class AbstractKinesisConnector {
      * @param awsCredentialPluginParam
      * @return
      */
-    protected AwsCredentialProviderPlugin defaultCredentialProvider(String awsCredentialPluginParam) {
+    public AwsCredentialProviderPlugin defaultCredentialProvider(String awsCredentialPluginParam) {
         Map<String, String> credentialMap = new Gson().fromJson(awsCredentialPluginParam,
                 new TypeToken<Map<String, String>>() {
                 }.getType());
         String accessKey = credentialMap.get(ACCESS_KEY_NAME);
         String secretKey = credentialMap.get(SECRET_KEY_NAME);
-        checkArgument(isNotBlank(accessKey) && isNotBlank(secretKey),
-                String.format(
-                        "Default %s and %s must be present into json-map if AwsCredentialProviderPlugin not provided",
-                        ACCESS_KEY_NAME, SECRET_KEY_NAME));
+        if (!(StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey))) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Default %s and %s must be present into json-map if AwsCredentialProviderPlugin not provided",
+                            ACCESS_KEY_NAME, SECRET_KEY_NAME)
+            );
+        }
         return new AwsCredentialProviderPlugin() {
             @Override
             public void init(String param) {
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java b/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsCredentialProviderPlugin.java
similarity index 98%
copy from pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
copy to pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsCredentialProviderPlugin.java
index 15ba7c5..4c87921 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
+++ b/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsCredentialProviderPlugin.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.pulsar.io.kinesis;
+package org.apache.pulsar.io.aws;
 
 import java.io.Closeable;
 
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java b/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsDefaultProviderChainPlugin.java
similarity index 97%
copy from pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java
copy to pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsDefaultProviderChainPlugin.java
index 6ed6b3b..6717b44 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java
+++ b/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsDefaultProviderChainPlugin.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.kinesis;
+package org.apache.pulsar.io.aws;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java b/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/STSAssumeRoleProviderPlugin.java
similarity index 98%
copy from pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java
copy to pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/STSAssumeRoleProviderPlugin.java
index e3d133b..9f24725 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java
+++ b/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/STSAssumeRoleProviderPlugin.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.kinesis;
+package org.apache.pulsar.io.aws;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/dynamodb/pom.xml
similarity index 77%
copy from pulsar-io/kinesis/pom.xml
copy to pulsar-io/dynamodb/pom.xml
index 43c5ed0..dbcb9f4 100644
--- a/pulsar-io/kinesis/pom.xml
+++ b/pulsar-io/dynamodb/pom.xml
@@ -27,8 +27,8 @@
     <version>2.6.0-SNAPSHOT</version>
   </parent>
 
-  <artifactId>pulsar-io-kinesis</artifactId>
-  <name>Pulsar IO :: Kinesis</name>
+  <artifactId>pulsar-io-dynamodb</artifactId>
+  <name>Pulsar IO :: DynamoDB</name>
 
   <dependencies>
 
@@ -46,6 +46,12 @@
     </dependency>
 
     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-aws</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>
@@ -72,42 +78,19 @@
       <artifactId>gson</artifactId>
     </dependency>
 
-    <!-- kinesis dependencies -->
+    <!-- dynamodb dependencies -->
     <dependency>
       <groupId>com.amazonaws</groupId>
       <artifactId>aws-java-sdk-core</artifactId>
     </dependency>
 
-    <dependency>
-      <groupId>software.amazon.kinesis</groupId>
-      <artifactId>amazon-kinesis-client</artifactId>
-      <version>2.2.8</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.amazonaws</groupId>
-      <artifactId>amazon-kinesis-producer</artifactId>
-      <version>0.14.0</version>
-    </dependency>
-
+    <!--  This pulls in the appropriate version of the kinesis consumer library as a transitive dependency -->
     <dependency>
       <groupId>com.amazonaws</groupId>
-      <artifactId>aws-java-sdk-sts</artifactId>
-      <version>1.11.619</version>
-    </dependency>
-
-    <dependency>
-      <groupId>software.amazon.awssdk</groupId>
-      <artifactId>sts</artifactId>
-      <version>2.10.56</version>
-    </dependency>
-	<!-- /kinesis dependencies -->
-
-    <dependency>
-      <groupId>com.google.flatbuffers</groupId>
-      <artifactId>flatbuffers-java</artifactId>
-      <version>1.9.0</version>
+      <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
+      <version>1.5.1</version>
     </dependency>
+    <!-- /dynamodb dependencies -->
 
   </dependencies>
 
diff --git a/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java
new file mode 100644
index 0000000..f43e93c
--- /dev/null
+++ b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.dynamodb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Stream;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
+import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.io.aws.AbstractAwsConnector;
+import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+@Connector(
+        name = "dynamodb",
+        type = IOType.SOURCE,
+        help = "A source connector that copies messages from DynamoDB Streams to Pulsar",
+        configClass = DynamoDBSourceConfig.class
+    )
+@Slf4j
+public class DynamoDBSource extends AbstractAwsConnector implements Source<byte[]> {
+
+    private LinkedBlockingQueue<StreamsRecord> queue;
+    private DynamoDBSourceConfig dynamodbSourceConfig;
+    private KinesisClientLibConfiguration kinesisClientLibConfig;
+    private IRecordProcessorFactory recordProcessorFactory;
+    private String workerId;
+    private Worker worker;
+    private Thread workerThread;
+    private Throwable threadEx;
+
+
+    @Override
+    public void close() throws Exception {
+        worker.shutdown();
+    }
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+        this.dynamodbSourceConfig = DynamoDBSourceConfig.load(config);
+        
+        checkArgument(isNotBlank(dynamodbSourceConfig.getAwsDynamodbStreamArn()), "empty dynamo-stream arn");
+//       Even if the endpoint is set, it seems to require a region to go with it
+        checkArgument(isNotBlank(dynamodbSourceConfig.getAwsRegion()),
+                     "The aws-region must be set");
+        checkArgument(isNotBlank(dynamodbSourceConfig.getAwsCredentialPluginParam()), "empty aws-credential param");
+        
+        if (dynamodbSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
+            checkArgument((dynamodbSourceConfig.getStartAtTime() != null),"Timestamp must be specified");
+        }
+        
+        queue = new LinkedBlockingQueue<> (dynamodbSourceConfig.getReceiveQueueSize());
+        workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
+        
+        AwsCredentialProviderPlugin credentialsProvider = createCredentialProvider(
+                dynamodbSourceConfig.getAwsCredentialPluginName(),
+                dynamodbSourceConfig.getAwsCredentialPluginParam());
+
+        AmazonDynamoDBStreams dynamoDBStreamsClient = dynamodbSourceConfig.buildDynamoDBStreamsClient(credentialsProvider);
+        AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoDBStreamsClient);
+        recordProcessorFactory = new StreamsRecordProcessorFactory(queue, dynamodbSourceConfig);
+
+        kinesisClientLibConfig = new KinesisClientLibConfiguration(dynamodbSourceConfig.getApplicationName(),
+                dynamodbSourceConfig.getAwsDynamodbStreamArn(),
+                credentialsProvider.getCredentialProvider(),
+                workerId)
+                .withRegionName(dynamodbSourceConfig.getAwsRegion())
+                .withInitialPositionInStream(dynamodbSourceConfig.getInitialPositionInStream());
+
+        if(kinesisClientLibConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
+            kinesisClientLibConfig.withTimestampAtInitialPositionInStream(dynamodbSourceConfig.getStartAtTime());
+        }
+
+        worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(recordProcessorFactory,
+                kinesisClientLibConfig,
+                adapterClient,
+                dynamodbSourceConfig.buildDynamoDBClient(credentialsProvider),
+                dynamodbSourceConfig.buildCloudwatchClient(credentialsProvider));
+
+        workerThread = new Thread(worker);
+        workerThread.setDaemon(true);
+        threadEx = null;
+        workerThread.setUncaughtExceptionHandler((t, ex) -> {
+            threadEx = ex;
+            log.error("Worker died with error", ex);
+        });
+        workerThread.start();
+    }
+
+    @Override
+    public StreamsRecord read() throws Exception {
+        try {
+            return queue.take();
+        } catch (InterruptedException ex) {
+            log.warn("Got interrupted when trying to fetch out of the queue");
+            if (threadEx != null) {
+                log.error("error from scheduler", threadEx);
+            }
+            throw ex;
+        }
+    }
+
+}
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java
similarity index 54%
copy from pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
copy to pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java
index 8f31996..ef8f43f 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
+++ b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.kinesis;
+package org.apache.pulsar.io.dynamodb;
 
 import java.io.File;
 import java.io.IOException;
@@ -25,30 +25,66 @@ import java.net.URI;
 import java.util.Date;
 import java.util.Map;
 
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
+import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
-import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
-import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
-import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
-import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
-import software.amazon.kinesis.common.InitialPositionInStreamExtended;
-import software.amazon.kinesis.common.KinesisClientUtil;
-import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.awssdk.regions.Region;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 
 import lombok.Data;
-import lombok.EqualsAndHashCode;
 
 @Data
-@EqualsAndHashCode(callSuper=true)
-public class KinesisSourceConfig extends BaseKinesisConfig implements Serializable {
+public class DynamoDBSourceConfig implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
     @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "Dynamodb streams end-point url. It can be found at https://docs.aws.amazon.com/general/latest/gr/rande.html"
+    )
+    private String awsEndpoint = "";
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "Appropriate aws region. E.g. us-west-1, us-west-2"
+    )
+    private String awsRegion = "";
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "",
+            help = "Dynamodb stream arn"
+    )
+    private String awsDynamodbStreamArn = "";
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "Fully-Qualified class name of implementation of AwsCredentialProviderPlugin."
+                    + " It is a factory class which creates an AWSCredentialsProvider that will be used by dynamodb."
+                    + " If it is empty then dynamodb will create a default AWSCredentialsProvider which accepts json-map"
+                    + " of credentials in `awsCredentialPluginParam`")
+    private String awsCredentialPluginName = "";
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "json-parameters to initialize `AwsCredentialsProviderPlugin`")
+    private String awsCredentialPluginParam = "";
+
+    @FieldDoc(
         required = false,
         defaultValue = "LATEST",
         help = "Used to specify the position in the stream where the connector should start from.\n"
@@ -80,16 +116,16 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab
     @FieldDoc(
         required = false,
         defaultValue = "Apache Pulsar IO Connector",
-        help = "Name of the Amazon Kinesis application. By default the application name is included "
+        help = "Name of the dynamodb consumer application. By default the application name is included "
                 + "in the user agent string used to make AWS requests. This can assist with troubleshooting "
                 + "(e.g. distinguish requests made by separate connectors instances)."
     )
-    private String applicationName = "pulsar-kinesis";
+    private String applicationName = "pulsar-dynamodb";
 
     @FieldDoc(
         required = false,
         defaultValue = "60000",
-        help = "The frequency of the Kinesis stream checkpointing (in milliseconds)"
+        help = "The frequency of the stream checkpointing (in milliseconds)"
     )
     private long checkpointInterval = 60000L;
 
@@ -97,7 +133,7 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab
         required = false,
         defaultValue = "3000",
         help = "The amount of time to delay between requests when the connector encounters a Throttling"
-                + "exception from AWS Kinesis (in milliseconds)"
+                + "exception from dynamodb (in milliseconds)"
     )
     private long backoffTime = 3000L;
 
@@ -132,60 +168,57 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab
     )
     private String cloudwatchEndpoint = "";
 
-    @FieldDoc(
-        required = false,
-        defaultValue = "true",
-        help = "When true, uses Kinesis enhanced fan-out, when false, uses polling"
-    )
-    private boolean useEnhancedFanOut = true;
-
 
-    public static KinesisSourceConfig load(String yamlFile) throws IOException {
+    public static DynamoDBSourceConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-        return mapper.readValue(new File(yamlFile), KinesisSourceConfig.class);
+        return mapper.readValue(new File(yamlFile), DynamoDBSourceConfig.class);
     }
 
-    public static KinesisSourceConfig load(Map<String, Object> map) throws IOException {
+    public static DynamoDBSourceConfig load(Map<String, Object> map) throws IOException {
         ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(new ObjectMapper().writeValueAsString(map), KinesisSourceConfig.class);
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), DynamoDBSourceConfig.class);
     }
 
-    public KinesisAsyncClient buildKinesisAsyncClient(AwsCredentialProviderPlugin credPlugin) {
-        KinesisAsyncClientBuilder builder = KinesisAsyncClient.builder();
+    protected Region regionAsV2Region() {
+        return Region.of(this.getAwsRegion());
+    }
+
+    public AmazonDynamoDBStreams buildDynamoDBStreamsClient(AwsCredentialProviderPlugin credPlugin) {
+        AmazonDynamoDBStreamsClientBuilder builder = AmazonDynamoDBStreamsClientBuilder.standard();
 
         if (!this.getAwsEndpoint().isEmpty()) {
-            builder.endpointOverride(URI.create(this.getAwsEndpoint()));
+            builder.setEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(this.getAwsEndpoint(), this.getAwsRegion()));
         }
         if (!this.getAwsRegion().isEmpty()) {
-            builder.region(this.regionAsV2Region());
+            builder.setRegion(this.getAwsRegion());
         }
-        builder.credentialsProvider(credPlugin.getV2CredentialsProvider());
-        return KinesisClientUtil.createKinesisAsyncClient(builder);
+        builder.setCredentials(credPlugin.getCredentialProvider());
+        return builder.build();
     }
 
-    public DynamoDbAsyncClient buildDynamoAsyncClient(AwsCredentialProviderPlugin credPlugin) {
-        DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder();
+    public AmazonDynamoDB buildDynamoDBClient(AwsCredentialProviderPlugin credPlugin) {
+        AmazonDynamoDBClientBuilder builder = AmazonDynamoDBClientBuilder.standard();
 
-        if (!this.getDynamoEndpoint().isEmpty()) {
-            builder.endpointOverride(URI.create(this.getDynamoEndpoint()));
+        if (!this.getAwsEndpoint().isEmpty()) {
+            builder.setEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(this.getDynamoEndpoint(), this.getAwsRegion()));
         }
         if (!this.getAwsRegion().isEmpty()) {
-            builder.region(this.regionAsV2Region());
+            builder.setRegion(this.getAwsRegion());
         }
-        builder.credentialsProvider(credPlugin.getV2CredentialsProvider());
+        builder.setCredentials(credPlugin.getCredentialProvider());
         return builder.build();
     }
 
-    public CloudWatchAsyncClient buildCloudwatchAsyncClient(AwsCredentialProviderPlugin credPlugin) {
-        CloudWatchAsyncClientBuilder builder = CloudWatchAsyncClient.builder();
+    public AmazonCloudWatch buildCloudwatchClient(AwsCredentialProviderPlugin credPlugin) {
+        AmazonCloudWatchClientBuilder builder = AmazonCloudWatchClientBuilder.standard();
 
-        if (!this.getCloudwatchEndpoint().isEmpty()) {
-            builder.endpointOverride(URI.create(this.getCloudwatchEndpoint()));
+        if (!this.getAwsEndpoint().isEmpty()) {
+            builder.setEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(this.getCloudwatchEndpoint(), this.getAwsRegion()));
         }
         if (!this.getAwsRegion().isEmpty()) {
-            builder.region(this.regionAsV2Region());
+            builder.setRegion(this.getAwsRegion());
         }
-        builder.credentialsProvider(credPlugin.getV2CredentialsProvider());
+        builder.setCredentials(credPlugin.getCredentialProvider());
         return builder.build();
     }
 
diff --git a/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecord.java b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecord.java
new file mode 100644
index 0000000..c5720b3
--- /dev/null
+++ b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecord.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.dynamodb;
+
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter;
+import lombok.Getter;
+import org.apache.pulsar.functions.api.Record;
+import software.amazon.awssdk.utils.StringUtils;
+
+/**
+ *  This is a direct adaptation of the kinesis record for kcl v1,
+ *  with a little branching added for dynamo-specific logic.
+ */
+
+@Getter
+public class StreamsRecord implements Record<byte[]> {
+    
+    public static final String ARRIVAL_TIMESTAMP = "ARRIVAL_TIMESTAMP";
+    public static final String ENCRYPTION_TYPE = "ENCRYPTION_TYPE";
+    public static final String PARTITION_KEY = "PARTITION_KEY";
+    public static final String SEQUENCE_NUMBER = "SEQUENCE_NUMBER";
+    public static final String EVENT_NAME = "EVENT_NAME";
+
+    private static final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
+    private final Optional<String> key;
+    private final byte[] value;
+    private final Map<String, String> properties = new HashMap<String, String> ();
+    
+    public StreamsRecord(com.amazonaws.services.kinesis.model.Record record) {
+        if (record instanceof RecordAdapter) {
+            com.amazonaws.services.dynamodbv2.model.Record dynamoRecord = ((RecordAdapter) record).getInternalObject();
+            this.key = Optional.of(dynamoRecord.getEventID());
+            setProperty(EVENT_NAME, dynamoRecord.getEventName());
+            setProperty(SEQUENCE_NUMBER, dynamoRecord.getDynamodb().getSequenceNumber());
+        } else {
+            this.key = Optional.of(record.getPartitionKey());
+            setProperty(ARRIVAL_TIMESTAMP, record.getApproximateArrivalTimestamp().toString());
+            setProperty(ENCRYPTION_TYPE, record.getEncryptionType());
+            setProperty(PARTITION_KEY, record.getPartitionKey());
+            setProperty(SEQUENCE_NUMBER, record.getSequenceNumber());
+        }
+
+        if (StringUtils.isBlank(record.getEncryptionType())) {
+            String s = null;
+            try {
+                s = decoder.decode(record.getData()).toString();
+            } catch (CharacterCodingException e) {
+               // Ignore
+            }
+            this.value = (s != null) ? s.getBytes() : null;
+        } else {
+            // Who knows?
+            this.value = null;
+        }
+    }
+
+    public void setProperty(String key, String value) {
+        properties.put(key, value);
+    }
+}
diff --git a/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecordProcessor.java b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecordProcessor.java
new file mode 100644
index 0000000..6d6aa83
--- /dev/null
+++ b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecordProcessor.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.dynamodb;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
+import com.amazonaws.services.kinesis.model.Record;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+@Slf4j
+// This is a direct adaptation of the kinesis record processor for kcl v1; no dynamo-specific logic
+public class StreamsRecordProcessor implements IRecordProcessor {
+
+    private final int numRetries;
+    private final long checkpointInterval;
+    private final long backoffTime;
+
+    private final LinkedBlockingQueue<StreamsRecord> queue;
+    private long nextCheckpointTimeInNanos;
+    private String kinesisShardId;
+    
+    public StreamsRecordProcessor(LinkedBlockingQueue<StreamsRecord> queue, DynamoDBSourceConfig config) {
+        this.queue = queue;
+        this.checkpointInterval = config.getCheckpointInterval();
+        this.numRetries = config.getNumRetries();
+        this.backoffTime = config.getBackoffTime();
+    }
+
+    private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
+        log.info("Checkpointing shard " + kinesisShardId);
+        
+        for (int i = 0; i < numRetries; i++) {
+            try {
+                checkpointer.checkpoint();
+                break;
+            } catch (ShutdownException se) {
+                // Ignore checkpoint if the processor instance has been shutdown.
+                log.info("Caught shutdown exception, skipping checkpoint.", se);
+                break;
+            } catch (InvalidStateException e) {
+                log.error("Cannot save checkpoint to the DynamoDB table.", e);
+                break;
+            } catch (ThrottlingException | KinesisClientLibDependencyException e) {
+                // Back off and re-attempt checkpoint upon transient failures
+                if (i >= (numRetries - 1)) {
+                    log.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
+                    break;
+                }
+            }
+
+            try {
+                Thread.sleep(backoffTime);
+            } catch (InterruptedException e) {
+                log.debug("Interrupted sleep", e);
+            }
+        }
+    }
+
+    @Override
+    public void initialize(InitializationInput initializationInput) {
+        kinesisShardId = initializationInput.getShardId();
+    }
+
+    @Override
+    public void processRecords(ProcessRecordsInput processRecordsInput) {
+
+        log.debug("Processing {} records from {}", processRecordsInput.getRecords().size(), kinesisShardId);
+
+        for (Record record : processRecordsInput.getRecords()) {
+            try {
+                queue.put(new StreamsRecord(record));
+            } catch (InterruptedException e) {
+                log.warn("unable to create KinesisRecord ", e);
+            }
+        }
+
+        // Checkpoint once every checkpoint interval.
+        if (System.nanoTime() > nextCheckpointTimeInNanos) {
+            checkpoint(processRecordsInput.getCheckpointer());
+            nextCheckpointTimeInNanos = System.nanoTime() + checkpointInterval;
+        }
+    }
+
+    @Override
+    public void shutdown(ShutdownInput shutdownInput) {
+        log.info("Shutting down record processor for shard: {}", kinesisShardId);
+        checkpoint(shutdownInput.getCheckpointer());
+    }
+}
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecordProcessorFactory.java
similarity index 51%
copy from pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java
copy to pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecordProcessorFactory.java
index 6ed6b3b..545c2cd 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java
+++ b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecordProcessorFactory.java
@@ -16,32 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.kinesis;
+package org.apache.pulsar.io.dynamodb;
 
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
-import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
 
-import java.io.IOException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
 
-public class AwsDefaultProviderChainPlugin implements AwsCredentialProviderPlugin {
-    @Override
-    public void init(String param) {
+import java.util.concurrent.LinkedBlockingQueue;
 
-    }
+public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
 
-    @Override
-    public AWSCredentialsProvider getCredentialProvider() {
-        return new DefaultAWSCredentialsProviderChain();
+    private final LinkedBlockingQueue<StreamsRecord> queue;
+    private final DynamoDBSourceConfig config;
+    
+    public StreamsRecordProcessorFactory(LinkedBlockingQueue<StreamsRecord> queue,
+                                         DynamoDBSourceConfig kinesisSourceConfig) {
+        this.queue = queue;
+        this.config = kinesisSourceConfig;
     }
 
     @Override
-    public software.amazon.awssdk.auth.credentials.AwsCredentialsProvider getV2CredentialsProvider() {
-        return DefaultCredentialsProvider.create();
-    }
-
-    @Override
-    public void close() throws IOException {
-
+    public IRecordProcessor createProcessor() {
+        return new StreamsRecordProcessor(queue, config);
     }
 }
diff --git a/pulsar-io/dynamodb/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/dynamodb/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..dd6ba97
--- /dev/null
+++ b/pulsar-io/dynamodb/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: dynamodb
+description: DynamoDB connectors
+sourceClass: org.apache.pulsar.io.dynamodb.DynamoDBSource
diff --git a/pulsar-io/dynamodb/src/test/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfigTests.java b/pulsar-io/dynamodb/src/test/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfigTests.java
new file mode 100644
index 0000000..2d7ac72
--- /dev/null
+++ b/pulsar-io/dynamodb/src/test/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfigTests.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.dynamodb;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import org.testng.annotations.Test;
+
+
+public class DynamoDBSourceConfigTests {
+
+    private static final Date DAY;
+    
+    static {
+        Calendar then = Calendar.getInstance();
+        then.set(Calendar.YEAR, 2019);
+        then.set(Calendar.MONTH, Calendar.MARCH);
+        then.set(Calendar.DAY_OF_MONTH, 5);
+        then.set(Calendar.HOUR_OF_DAY, 19);
+        then.set(Calendar.MINUTE, 28);
+        then.set(Calendar.SECOND, 58);
+        then.set(Calendar.MILLISECOND, 0);
+        then.set(Calendar.ZONE_OFFSET, 0);
+        DAY = then.getTime();
+    }
+
+    @Test
+    public final void loadFromYamlFileTest() throws IOException {
+        File yamlFile = getFile("sourceConfig.yaml");
+        DynamoDBSourceConfig config = DynamoDBSourceConfig.load(yamlFile.getAbsolutePath());
+        assertNotNull(config);
+        assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
+        assertEquals(config.getAwsRegion(), "us-east-1");
+        assertEquals(config.getAwsDynamodbStreamArn(), "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291");
+        assertEquals(config.getAwsCredentialPluginParam(),
+                "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+        assertEquals(config.getApplicationName(), "My test application");
+        assertEquals(config.getCheckpointInterval(), 30000);
+        assertEquals(config.getBackoffTime(), 4000);
+        assertEquals(config.getNumRetries(), 3);
+        assertEquals(config.getReceiveQueueSize(), 2000);
+        assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
+        
+        Calendar cal = Calendar.getInstance();
+        cal.setTime(config.getStartAtTime());
+        ZonedDateTime actual = ZonedDateTime.ofInstant(cal.toInstant(), ZoneOffset.UTC);
+        ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(), ZoneOffset.UTC);
+        assertEquals(actual, expected);
+    }
+    
+    @Test
+    public final void loadFromMapTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("awsEndpoint", "https://some.endpoint.aws");
+        map.put("awsRegion", "us-east-1");
+        map.put("awsDynamodbStreamArn", "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291");
+        map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+        map.put("checkpointInterval", "30000");
+        map.put("backoffTime", "4000");
+        map.put("numRetries", "3");
+        map.put("receiveQueueSize", 2000);
+        map.put("applicationName", "My test application");
+        map.put("initialPositionInStream", InitialPositionInStream.TRIM_HORIZON);
+        map.put("startAtTime", DAY);
+
+        DynamoDBSourceConfig config = DynamoDBSourceConfig.load(map);
+        
+        assertNotNull(config);
+        assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
+        assertEquals(config.getAwsRegion(), "us-east-1");
+        assertEquals(config.getAwsDynamodbStreamArn(), "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291");
+        assertEquals(config.getAwsCredentialPluginParam(),
+                "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+        assertEquals(config.getApplicationName(), "My test application");
+        assertEquals(config.getCheckpointInterval(), 30000);
+        assertEquals(config.getBackoffTime(), 4000);
+        assertEquals(config.getNumRetries(), 3);
+        assertEquals(config.getReceiveQueueSize(), 2000);
+        assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
+        
+        Calendar cal = Calendar.getInstance();
+        cal.setTime(config.getStartAtTime());
+        ZonedDateTime actual = ZonedDateTime.ofInstant(cal.toInstant(), ZoneOffset.UTC);
+        ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(), ZoneOffset.UTC);
+        assertEquals(actual, expected);
+    }
+    
+    @Test(expectedExceptions = IllegalArgumentException.class, 
+            expectedExceptionsMessageRegExp = "empty aws-credential param")
+    public final void missingCredentialsTest() throws Exception {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("awsEndpoint", "https://some.endpoint.aws");
+        map.put("awsRegion", "us-east-1");
+        map.put("awsDynamodbStreamArn", "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291");
+     
+        DynamoDBSource source = new DynamoDBSource();
+        source.open(map, null);
+    }
+    
+    @Test(expectedExceptions = IllegalArgumentException.class, 
+            expectedExceptionsMessageRegExp = "Timestamp must be specified")
+    public final void missingStartTimeTest() throws Exception {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("awsEndpoint", "https://some.endpoint.aws");
+        map.put("awsRegion", "us-east-1");
+        map.put("awsDynamodbStreamArn", "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291");
+        map.put("awsCredentialPluginParam", 
+                "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+        map.put("initialPositionInStream", InitialPositionInStream.AT_TIMESTAMP);
+
+        DynamoDBSource source = new DynamoDBSource();
+        source.open(map, null);
+    }
+    
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+}
diff --git a/pulsar-io/dynamodb/src/test/resources/sourceConfig.yaml b/pulsar-io/dynamodb/src/test/resources/sourceConfig.yaml
new file mode 100644
index 0000000..c478ff0
--- /dev/null
+++ b/pulsar-io/dynamodb/src/test/resources/sourceConfig.yaml
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+  "awsEndpoint" : "https://some.endpoint.aws",
+  "awsRegion": "us-east-1",
+  "awsDynamodbStreamArn": "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291",
+  "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
+  "applicationName": "My test application",
+  "checkpointInterval": "30000",
+  "backoffTime":"4000",
+  "numRetries":"3",
+  "receiveQueueSize": 2000,
+  "initialPositionInStream": "TRIM_HORIZON",
+  "startAtTime": "2019-03-05T19:28:58.000Z"
+}
\ No newline at end of file
diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml
index 43c5ed0..363ae05 100644
--- a/pulsar-io/kinesis/pom.xml
+++ b/pulsar-io/kinesis/pom.xml
@@ -46,6 +46,12 @@
     </dependency>
 
     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-aws</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>
@@ -89,18 +95,6 @@
       <artifactId>amazon-kinesis-producer</artifactId>
       <version>0.14.0</version>
     </dependency>
-
-    <dependency>
-      <groupId>com.amazonaws</groupId>
-      <artifactId>aws-java-sdk-sts</artifactId>
-      <version>1.11.619</version>
-    </dependency>
-
-    <dependency>
-      <groupId>software.amazon.awssdk</groupId>
-      <artifactId>sts</artifactId>
-      <version>2.10.56</version>
-    </dependency>
 	<!-- /kinesis dependencies -->
 
     <dependency>
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
index 15ba7c5..23c3ec8 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
@@ -19,58 +19,12 @@
 
 package org.apache.pulsar.io.kinesis;
 
-import java.io.Closeable;
-
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.auth.AWSSessionCredentials;
-import com.amazonaws.auth.BasicSessionCredentials;
-
 /**
- * Kinesis source/sink calls credential-provider while refreshing aws accessKey and secreKey. So, implementation
- * AwsCredentialProviderPlugin needs to makes sure to return non-expired keys when it requires.
+ * This is a stub class for backwards compatibility.  In new code and configurations, please use the plugins
+ * from org.apache.pulsar.io.aws
  *
+ * @see org.apache.pulsar.io.aws.AwsCredentialProviderPlugin
  */
-public interface AwsCredentialProviderPlugin extends Closeable {
-
-    /**
-     * accepts aws-account related param and initialize credential provider.
-     * 
-     * @param param
-     */
-    void init(String param);
-
-    /**
-     * Returned {@link AWSCredentialsProvider} can give {@link AWSCredentials} in case credential belongs to IAM user or
-     * it can return {@link BasicSessionCredentials} if user wants to generate temporary credential for a given IAM
-     * role.
-     * 
-     * @return
-     */
-    AWSCredentialsProvider getCredentialProvider();
-
-    /**
-     * Returns a V2 credential provider for use with the v2 SDK.
-     *
-     * Defaults to an implementation that pulls credentials from a v1 provider
-     */
-    default software.amazon.awssdk.auth.credentials.AwsCredentialsProvider getV2CredentialsProvider() {
-        // make a small wrapper to forward requests to v1, this allows
-        // for this interface to not "break" for implementers
-        AWSCredentialsProvider v1Provider = getCredentialProvider();
-        return () -> {
-            AWSCredentials creds = v1Provider.getCredentials();
-            if (creds instanceof AWSSessionCredentials) {
-                return software.amazon.awssdk.auth.credentials.AwsSessionCredentials.create(
-                        creds.getAWSAccessKeyId(),
-                        creds.getAWSSecretKey(),
-                        ((AWSSessionCredentials) creds).getSessionToken());
-            } else {
-                return software.amazon.awssdk.auth.credentials.AwsBasicCredentials.create(
-                        creds.getAWSAccessKeyId(),
-                        creds.getAWSSecretKey());
-            }
-        };
-    }
-
+@Deprecated
+public interface AwsCredentialProviderPlugin extends org.apache.pulsar.io.aws.AwsCredentialProviderPlugin {
 }
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java
index 6ed6b3b..4fcf1e8 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java
@@ -18,30 +18,12 @@
  */
 package org.apache.pulsar.io.kinesis;
 
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
-import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
-
-import java.io.IOException;
-
-public class AwsDefaultProviderChainPlugin implements AwsCredentialProviderPlugin {
-    @Override
-    public void init(String param) {
-
-    }
-
-    @Override
-    public AWSCredentialsProvider getCredentialProvider() {
-        return new DefaultAWSCredentialsProviderChain();
-    }
-
-    @Override
-    public software.amazon.awssdk.auth.credentials.AwsCredentialsProvider getV2CredentialsProvider() {
-        return DefaultCredentialsProvider.create();
-    }
-
-    @Override
-    public void close() throws IOException {
-
-    }
+/**
+ * This is a stub class for backwards compatibility.  In new code and configurations, please use the plugins
+ * from org.apache.pulsar.io.aws
+ *
+ * @see org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin
+ */
+@Deprecated
+public class AwsDefaultProviderChainPlugin extends org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin implements AwsCredentialProviderPlugin {
 }
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index aa06205..bf58cfa 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -24,23 +24,18 @@ import static com.google.common.util.concurrent.Futures.addCallback;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
-import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.services.kinesis.producer.KinesisProducer;
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration.ThreadingModel;
 import com.amazonaws.services.kinesis.producer.UserRecordResult;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
 
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -49,6 +44,8 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.aws.AbstractAwsConnector;
+import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
@@ -88,7 +85,7 @@ import org.slf4j.LoggerFactory;
     help = "A sink connector that copies messages from Pulsar to Kinesis",
     configClass = KinesisSinkConfig.class
 )
-public class KinesisSink extends AbstractKinesisConnector implements Sink<byte[]> {
+public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
 
     private static final Logger LOG = LoggerFactory.getLogger(KinesisSink.class);
 
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java
index e97d44c..598f3af 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java
@@ -27,6 +27,8 @@ import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.io.aws.AbstractAwsConnector;
+import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
 import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.Connector;
@@ -51,7 +53,7 @@ import software.amazon.kinesis.retrieval.polling.PollingConfig;
         configClass = KinesisSourceConfig.class
     )
 @Slf4j
-public class KinesisSource extends AbstractKinesisConnector implements Source<byte[]> {
+public class KinesisSource extends AbstractAwsConnector implements Source<byte[]> {
 
     private LinkedBlockingQueue<KinesisRecord> queue;
     private KinesisSourceConfig kinesisSourceConfig;
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
index 8f31996..d75b521 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
@@ -25,6 +25,7 @@ import java.net.URI;
 import java.util.Date;
 import java.util.Map;
 
+import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java
index e3d133b..04d3e55 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java
@@ -18,47 +18,13 @@
  */
 package org.apache.pulsar.io.kinesis;
 
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import software.amazon.awssdk.services.sts.StsClient;
-import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class STSAssumeRoleProviderPlugin implements AwsCredentialProviderPlugin {
-    public static final String ASSUME_ROLE_ARN = "roleArn";
-    public static final String ASSUME_ROLE_SESSION_NAME = "roleSessionName";
-
-    private String roleArn;
-    private String roleSessionName;
-
-    @Override
-    public void init(String param) {
-        Map<String, String> credentialMap = new Gson().fromJson(param,
-                new TypeToken<Map<String, String>>() {
-                }.getType());
-
-        roleArn = credentialMap.get(ASSUME_ROLE_ARN);
-        roleSessionName = credentialMap.get(ASSUME_ROLE_SESSION_NAME);
-    }
-
-    @Override
-    public AWSCredentialsProvider getCredentialProvider() {
-        return new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, roleSessionName).build();
-    }
-
-    @Override
-    public software.amazon.awssdk.auth.credentials.AwsCredentialsProvider getV2CredentialsProvider() {
-        StsClient client = StsClient.create();
-        return StsAssumeRoleCredentialsProvider.builder().stsClient(client).refreshRequest((req) -> {
-            req.roleArn(roleArn).roleSessionName(roleSessionName).build();
-        }).build();
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
+/**
+ * This is a stub class for backwards compatibility.  In new code and configurations, please use the plugins
+ * from org.apache.pulsar.io.aws
+ *
+ * @see org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin
+ */
+@Deprecated
+public class STSAssumeRoleProviderPlugin extends org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin implements AwsCredentialProviderPlugin {
 }
+
diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java
index 2f07d11..7172c04 100644
--- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java
+++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io.kinesis;
 import java.io.IOException;
 import java.util.Map;
 
+import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index c1f7ff4..2e875e4 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -35,6 +35,7 @@
     <module>core</module>
     <module>common</module>
     <module>docs</module>
+    <module>aws</module>
     <module>twitter</module>
     <module>cassandra</module>
     <module>aerospike</module>
@@ -57,6 +58,7 @@
     <module>redis</module>
     <module>solr</module>
     <module>influxdb</module>
+    <module>dynamodb</module>
   </modules>
 
 </project>
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index 2e84477..9f6aa3b 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -45,6 +45,11 @@ Pulsar has various source connectors, which are sorted alphabetically as below.
 
 * [Java class](https://github.com/apache/pulsar/blob/master/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java)
 
+### DynamoDB
+
+* [Configuration](io-dynamodb-source.md#configuration)
+  
+* [Java class](https://github.com/apache/pulsar/blob/master/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java)
 
 ### File
 
diff --git a/site2/docs/io-kinesis-source.md b/site2/docs/io-dynamodb-source.md
similarity index 51%
copy from site2/docs/io-kinesis-source.md
copy to site2/docs/io-dynamodb-source.md
index b0c77ec..cdaafd2 100644
--- a/site2/docs/io-kinesis-source.md
+++ b/site2/docs/io-dynamodb-source.md
@@ -1,19 +1,19 @@
 ---
-id: io-kinesis-source
-title: Kinesis source connector
-sidebar_label: Kinesis source connector
+id: io-dynamodb-source
+title: AWS DynamoDB source connector
+sidebar_label: AWS DynamoDB source connector
 ---
 
-The Kinesis source connector pulls data from Amazon Kinesis and persists data into Pulsar.
+The DynamoDB source connector pulls data from DynamoDB table streams and persists data into Pulsar.
 
-This connector uses the [Kinesis Consumer Library](https://github.com/awslabs/amazon-kinesis-client) (KCL) to do the actual consuming of messages. The KCL uses DynamoDB to track state for consumers.
-
-> Note: currently, the Kinesis source connector only supports raw messages. If you use KMS encrypted messages, the encrypted messages are sent to downstream. This connector will support decrypting messages in the future release.
+This connector uses the [DynamoDB Streams Kinesis Adapter](https://github.com/awslabs/dynamodb-streams-kinesis-adapter),
+which uses the [Kinesis Consumer Library](https://github.com/awslabs/amazon-kinesis-client) (KCL) to do the actual
+consuming of messages. The KCL uses DynamoDB to track state for consumers and requires cloudwatch access to log metrics.
 
 
 ## Configuration
 
-The configuration of the Kinesis source connector has the following properties.
+The configuration of the DynamoDB source connector has the following properties.
 
 ### Property
 
@@ -21,23 +21,22 @@ The configuration of the Kinesis source connector has the following properties.
 |------|----------|----------|---------|-------------|
 `initialPositionInStream`|InitialPositionInStream|false|LATEST|The position where the connector starts from.<br/><br/>Below are the available options:<br/><br/><li>`AT_TIMESTAMP`: start from the record at or after the specified timestamp.<br/><br/><li>`LATEST`: start after the most recent data record.<br/><br/><li>`TRIM_HORIZON`: start from the oldest available data record.
 `startAtTime`|Date|false|" " (empty string)|If set to `AT_TIMESTAMP`, it specifies the point in time to start consumption.
-`applicationName`|String|false|Pulsar IO connector|The name of the Amazon Kinesis application. <br/><br/>By default, the application name is included in the user agent string used to make AWS requests. This can assist with troubleshooting, for example, distinguish requests made by separate connector instances.
-`checkpointInterval`|long|false|60000|The frequency of the Kinesis stream checkpoint in milliseconds.
+`applicationName`|String|false|Pulsar IO connector|The name of the KCL application.  Must be unique, as it is used to define the table name for the dynamo table used for state tracking. <br/><br/>By default, the application name is included in the user agent string used to make AWS requests. This can assist with troubleshooting, for example, distinguish requests made by separate connector instances.
+`checkpointInterval`|long|false|60000|The frequency of the KCL checkpoint in milliseconds.
 `backoffTime`|long|false|3000|The amount of time to delay between requests when the connector encounters a throttling exception from AWS Kinesis in milliseconds.
 `numRetries`|int|false|3|The number of re-attempts when the connector encounters an exception while trying to set a checkpoint.
 `receiveQueueSize`|int|false|1000|The maximum number of AWS records that can be buffered inside the connector. <br/><br/>Once the `receiveQueueSize` is reached, the connector does not consume any messages from Kinesis until some messages in the queue are successfully consumed.
 `dynamoEndpoint`|String|false|" " (empty string)|The Dynamo end-point URL, which can be found at [here](https://docs.aws.amazon.com/general/latest/gr/rande.html).
 `cloudwatchEndpoint`|String|false|" " (empty string)|The Cloudwatch end-point URL, which can be found at [here](https://docs.aws.amazon.com/general/latest/gr/rande.html).
-`useEnhancedFanOut`|boolean|false|true|If set to true, it uses Kinesis enhanced fan-out.<br><br>If set to false, it uses polling.
-`awsEndpoint`|String|false|" " (empty string)|The Kinesis end-point URL, which can be found at [here](https://docs.aws.amazon.com/general/latest/gr/rande.html).
+`awsEndpoint`|String|false|" " (empty string)|The DynamoDB Streams end-point URL, which can be found at [here](https://docs.aws.amazon.com/general/latest/gr/rande.html).
 `awsRegion`|String|false|" " (empty string)|The AWS region. <br/><br/>**Example**<br/> us-west-1, us-west-2
-`awsKinesisStreamName`|String|true|" " (empty string)|The Kinesis stream name.
-`awsCredentialPluginName`|String|false|" " (empty string)|The fully-qualified class name of implementation of {@inject: github:`AwsCredentialProviderPlugin`:/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java}.<br><br>`awsCredentialProviderPlugin` has the following built-in plugs:<br><br><li>`org.apache.pulsar.io.kinesis.AwsDefaultProviderChainPlugin`:<br> this plugin uses the default AWS provider chain.<br>For more information, see [using the d [...]
+`awsDynamodbStreamArn`|String|true|" " (empty string)|The DynamoDB stream arn.
+`awsCredentialPluginName`|String|false|" " (empty string)|The fully-qualified class name of implementation of {@inject: github:`AwsCredentialProviderPlugin`:/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsCredentialProviderPlugin.java}.<br><br>`awsCredentialProviderPlugin` has the following built-in plugs:<br><br><li>`org.apache.pulsar.io.kinesis.AwsDefaultProviderChainPlugin`:<br> this plugin uses the default AWS provider chain.<br>For more information, see [using the default c [...]
 `awsCredentialPluginParam`|String |false|" " (empty string)|The JSON parameter to initialize `awsCredentialsProviderPlugin`.
 
 ### Example
 
-Before using the Kinesis source connector, you need to create a configuration file through one of the following methods.
+Before using the DynamoDB source connector, you need to create a configuration file through one of the following methods.
 
 * JSON 
 
@@ -45,7 +44,7 @@ Before using the Kinesis source connector, you need to create a configuration fi
     {
         "awsEndpoint": "https://some.endpoint.aws",
         "awsRegion": "us-east-1",
-        "awsKinesisStreamName": "my-stream",
+        "awsDynamodbStreamArn": "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291",
         "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
         "applicationName": "My test application",
         "checkpointInterval": "30000",
@@ -63,7 +62,7 @@ Before using the Kinesis source connector, you need to create a configuration fi
     configs:
         awsEndpoint: "https://some.endpoint.aws"
         awsRegion: "us-east-1"
-        awsKinesisStreamName: "my-stream"
+        awsDynamodbStreamArn: "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291"
         awsCredentialPluginParam: "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"
         applicationName: "My test application"
         checkpointInterval: 30000
diff --git a/site2/docs/io-dynamodb.md b/site2/docs/io-dynamodb.md
new file mode 100644
index 0000000..502f9f0
--- /dev/null
+++ b/site2/docs/io-dynamodb.md
@@ -0,0 +1,5 @@
+---
+id: io-dynamodb
+title: AWS DynamoDB Connector
+sidebar_label: AWS DynamoDB Connector
+---
\ No newline at end of file
diff --git a/site2/docs/io-kinesis-sink.md b/site2/docs/io-kinesis-sink.md
index 7b7c012..0d0f739 100644
--- a/site2/docs/io-kinesis-sink.md
+++ b/site2/docs/io-kinesis-sink.md
@@ -19,20 +19,20 @@ The configuration of the Kinesis sink connector has the following property.
 `awsEndpoint`|String|false|" " (empty string)|The Kinesis end-point URL, which can be found at [here](https://docs.aws.amazon.com/general/latest/gr/rande.html).
 `awsRegion`|String|false|" " (empty string)|The AWS region. <br/><br/>**Example**<br/> us-west-1, us-west-2
 `awsKinesisStreamName`|String|true|" " (empty string)|The Kinesis stream name.
-`awsCredentialPluginName`|String|false|" " (empty string)|The fully-qualified class name of implementation of {@inject: github:`AwsCredentialProviderPlugin`:/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java}. <br/><br/>It is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink. <br/><br/>If it is empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in `awsCredent [...]
+`awsCredentialPluginName`|String|false|" " (empty string)|The fully-qualified class name of implementation of {@inject: github:`AwsCredentialProviderPlugin`:/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsCredentialProviderPlugin.java}. <br/><br/>It is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink. <br/><br/>If it is empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in `awsCredentialPlugi [...]
 `awsCredentialPluginParam`|String |false|" " (empty string)|The JSON parameter to initialize `awsCredentialsProviderPlugin`.
 
 ### Built-in plugins
 
 The following are built-in `AwsCredentialProviderPlugin` plugins:
 
-* `org.apache.pulsar.io.kinesis.AwsDefaultProviderChainPlugin`
+* `org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin`
   
     This plugin takes no configuration, it uses the default AWS provider chain. 
     
     For more information, see [AWS documentation](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default).
 
-* `org.apache.pulsar.io.kinesis.STSAssumeRoleProviderPlugin`
+* `org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin`
   
     This plugin takes a configuration (via the `awsCredentialPluginParam`) that describes a role to assume when running the KCL.
 
diff --git a/site2/docs/io-kinesis-source.md b/site2/docs/io-kinesis-source.md
index b0c77ec..863c480 100644
--- a/site2/docs/io-kinesis-source.md
+++ b/site2/docs/io-kinesis-source.md
@@ -32,7 +32,7 @@ The configuration of the Kinesis source connector has the following properties.
 `awsEndpoint`|String|false|" " (empty string)|The Kinesis end-point URL, which can be found at [here](https://docs.aws.amazon.com/general/latest/gr/rande.html).
 `awsRegion`|String|false|" " (empty string)|The AWS region. <br/><br/>**Example**<br/> us-west-1, us-west-2
 `awsKinesisStreamName`|String|true|" " (empty string)|The Kinesis stream name.
-`awsCredentialPluginName`|String|false|" " (empty string)|The fully-qualified class name of implementation of {@inject: github:`AwsCredentialProviderPlugin`:/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java}.<br><br>`awsCredentialProviderPlugin` has the following built-in plugs:<br><br><li>`org.apache.pulsar.io.kinesis.AwsDefaultProviderChainPlugin`:<br> this plugin uses the default AWS provider chain.<br>For more information, see [using the d [...]
+`awsCredentialPluginName`|String|false|" " (empty string)|The fully-qualified class name of implementation of {@inject: github:`AwsCredentialProviderPlugin`:/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsCredentialProviderPlugin.java}.<br><br>`awsCredentialProviderPlugin` has the following built-in plugs:<br><br><li>`org.apache.pulsar.io.kinesis.AwsDefaultProviderChainPlugin`:<br> this plugin uses the default AWS provider chain.<br>For more information, see [using the default c [...]
 `awsCredentialPluginParam`|String |false|" " (empty string)|The JSON parameter to initialize `awsCredentialsProviderPlugin`.
 
 ### Example
diff --git a/site2/website/data/connectors.js b/site2/website/data/connectors.js
index c382e32..6a29011 100644
--- a/site2/website/data/connectors.js
+++ b/site2/website/data/connectors.js
@@ -36,6 +36,12 @@ module.exports = [
         link: 'https://debezium.io/'
     },
     {
+        name: 'dynamodb',
+        longName: 'AWS DynamoDB source',
+        type: 'Source',
+        link: 'https://aws.amazon.com/dynamodb/'
+    },
+    {
         name: 'elastic-search',
         longName: 'ElasticSearch sink',
         type: 'Sink',