You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/05/04 22:11:24 UTC

[GitHub] [pulsar] maths22 opened a new pull request #6874: Add dynamodb streams source

maths22 opened a new pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874


   ### Motivation
   
   The goal is to allow consuming dynamodb streams directly into pulsar
   
   ### Modifications
   
   I created a new source for dynamo, which shares less code than ideal with the kinesis source, since the dynamodb kinesis client adapter supports KCL v1.x only, while, the kinesis source is using KCL v2.x.  I also abstracted the aws credential management pieces into their own package.
   
   ### Verifying this change
   
   TODO
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): yes
       (Upgrades the aws sdk to a newer patch version that supports dynamodb ondemand tables)
     - The public API: no
     - The schema: no
     - The default values of configurations: no
     - The wire protocol: no
     - The rest endpoints: no
     - The admin cli options: no
     - Anything that affects deployment: no
   
   ### Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? docs (to be written before taking out of draft status
     - If a feature is not applicable for documentation, explain why? n/a
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] addisonj commented on a change in pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
addisonj commented on a change in pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#discussion_r419776392



##########
File path: pulsar-io/dynamodb/pom.xml
##########
@@ -0,0 +1,105 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-io</artifactId>
+    <version>2.6.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-io-dynamodb</artifactId>
+  <name>Pulsar IO :: DynamoDB</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-functions-instance</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-aws</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+    </dependency>
+
+    <!-- add cbor for kinesis-client to fix dep conflict -->
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-cbor</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+
+    <!-- dynamodb dependencies -->
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>dynamodb-streams-kinesis-adapter</artifactId>

Review comment:
       does pulling this in get the `amazon-kinesis-client` library via a transitive dep?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] addisonj commented on pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
addisonj commented on pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#issuecomment-623751409


   oh one more thing, if you want to get this really in a good place, you can add the docs in `site2/docs/io-connectors.md` and a new doc probably in `site2/docs/io-dynamodb-streams.md`
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] maths22 commented on pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
maths22 commented on pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#issuecomment-630321779


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vzhikserg commented on a change in pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
vzhikserg commented on a change in pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#discussion_r422675341



##########
File path: pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.dynamodb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Stream;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
+import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.io.aws.AbstractAwsConnector;
+import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+/**
+ * 

Review comment:
       This java comment can be removed or a meaningful description should be added.

##########
File path: pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecord.java
##########
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.dynamodb;
+
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter;
+import org.apache.pulsar.functions.api.Record;
+import software.amazon.awssdk.utils.StringUtils;
+
+// This is a direct adaptation of the kinesis record for kcl v1; no dynamo-specific logic
+public class StreamsRecord implements Record<byte[]> {
+    
+    public static final String ARRIVAL_TIMESTAMP = "ARRIVAL_TIMESTAMP";
+    public static final String ENCRYPTION_TYPE = "ENCRYPTION_TYPE";
+    public static final String PARTITION_KEY = "PARTITION_KEY";
+    public static final String SEQUENCE_NUMBER = "SEQUENCE_NUMBER";
+    public static final String EVENT_NAME = "EVENT_NAME";
+
+    private static final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
+    private final Optional<String> key;
+    private final byte[] value;
+    private final HashMap<String, String> userProperties = new HashMap<String, String> ();
+    
+    public StreamsRecord(com.amazonaws.services.kinesis.model.Record record) {
+//        special-case the handling of dynamo records
+        if (record instanceof RecordAdapter) {
+            com.amazonaws.services.dynamodbv2.model.Record dynamoRecord = ((RecordAdapter) record).getInternalObject();
+            this.key = Optional.of(dynamoRecord.getEventID());
+            setProperty(EVENT_NAME, dynamoRecord.getEventName());
+            setProperty(SEQUENCE_NUMBER, dynamoRecord.getDynamodb().getSequenceNumber());
+        } else {
+            this.key = Optional.of(record.getPartitionKey());
+            setProperty(ARRIVAL_TIMESTAMP, record.getApproximateArrivalTimestamp().toString());
+            setProperty(ENCRYPTION_TYPE, record.getEncryptionType());
+            setProperty(PARTITION_KEY, record.getPartitionKey());
+            setProperty(SEQUENCE_NUMBER, record.getSequenceNumber());
+
+        }
+
+        if (StringUtils.isBlank(record.getEncryptionType())) {
+            String s = null;
+            try {
+                s = decoder.decode(record.getData()).toString();
+            } catch (CharacterCodingException e) {
+               // Ignore
+            }
+            this.value = (s != null) ? s.getBytes() : null;
+        } else {
+            // Who knows?
+            this.value = null;
+        }
+    }
+
+    @Override
+    public Optional<String> getKey() {

Review comment:
       These boilerplate getters can be generated by the Lombok annotation — `@Getter`

##########
File path: pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java
##########
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.dynamodb;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Date;
+import java.util.Map;
+
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
+import com.amazonaws.services.dynamodbv2.*;

Review comment:
       The recommendation for this project is to use separate import statements instead of `*`

##########
File path: pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecord.java
##########
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.dynamodb;
+
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter;
+import org.apache.pulsar.functions.api.Record;
+import software.amazon.awssdk.utils.StringUtils;
+
+// This is a direct adaptation of the kinesis record for kcl v1; no dynamo-specific logic

Review comment:
       It can be converted to JavaDoc

##########
File path: pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecordProcessor.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.dynamodb;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
+import com.amazonaws.services.kinesis.model.Record;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+@Slf4j
+// This is a direct adaptation of the kinesis record processor for kcl v1; no dynamo-specific logic
+public class StreamsRecordProcessor implements IRecordProcessor {
+
+    private final int numRetries;
+    private final long checkpointInterval;
+    private final long backoffTime;
+
+    private final LinkedBlockingQueue<StreamsRecord> queue;
+    private long nextCheckpointTimeInNanos;
+    private String kinesisShardId;
+    
+    public StreamsRecordProcessor(LinkedBlockingQueue<StreamsRecord> queue, DynamoDBSourceConfig config) {
+        this.queue = queue;
+        this.checkpointInterval = config.getCheckpointInterval();
+        this.numRetries = config.getNumRetries();
+        this.backoffTime = config.getBackoffTime();
+    }
+
+    private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
+        log.info("Checkpointing shard " + kinesisShardId);
+        
+        for (int i = 0; i < numRetries; i++) {
+            try {
+                checkpointer.checkpoint();
+                break;
+            } catch (ShutdownException se) {
+                // Ignore checkpoint if the processor instance has been shutdown.
+                log.info("Caught shutdown exception, skipping checkpoint.", se);
+                break;
+            } catch (InvalidStateException e) {
+                log.error("Cannot save checkpoint to the DynamoDB table.", e);
+                break;
+            } catch (ThrottlingException | KinesisClientLibDependencyException e) {
+                // Back off and re-attempt checkpoint upon transient failures
+                if (i >= (numRetries - 1)) {
+                    log.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
+                    break;
+                }
+            }
+
+            try {
+                Thread.sleep(backoffTime);
+            } catch (InterruptedException e) {
+                log.debug("Interrupted sleep", e);
+            }
+        }
+    }
+
+    @Override
+    public void initialize(InitializationInput initializationInput) {
+        kinesisShardId = initializationInput.getShardId();
+    }
+
+    @Override
+    public void processRecords(ProcessRecordsInput processRecordsInput) {
+
+        log.info("Processing {} records from {}", processRecordsInput.getRecords().size(), kinesisShardId);

Review comment:
       I'm wondering how much logs will it produce 🤔 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#discussion_r426932512



##########
File path: pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AbstractAwsConnector.java
##########
@@ -79,16 +77,19 @@ public static AwsCredentialProviderPlugin createCredentialProviderWithPlugin(Str
      * @param awsCredentialPluginParam
      * @return
      */
-    protected AwsCredentialProviderPlugin defaultCredentialProvider(String awsCredentialPluginParam) {
+    public AwsCredentialProviderPlugin defaultCredentialProvider(String awsCredentialPluginParam) {
         Map<String, String> credentialMap = new Gson().fromJson(awsCredentialPluginParam,
                 new TypeToken<Map<String, String>>() {
                 }.getType());
         String accessKey = credentialMap.get(ACCESS_KEY_NAME);
         String secretKey = credentialMap.get(SECRET_KEY_NAME);
-        checkArgument(isNotBlank(accessKey) && isNotBlank(secretKey),
-                String.format(
-                        "Default %s and %s must be present into json-map if AwsCredentialProviderPlugin not provided",
-                        ACCESS_KEY_NAME, SECRET_KEY_NAME));
+        if (!(StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey))) {

Review comment:
       Just out of curiosity, what was changed here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vzhikserg commented on a change in pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
vzhikserg commented on a change in pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#discussion_r422678092



##########
File path: pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecordProcessor.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.dynamodb;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
+import com.amazonaws.services.kinesis.model.Record;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+@Slf4j
+// This is a direct adaptation of the kinesis record processor for kcl v1; no dynamo-specific logic
+public class StreamsRecordProcessor implements IRecordProcessor {
+
+    private final int numRetries;
+    private final long checkpointInterval;
+    private final long backoffTime;
+
+    private final LinkedBlockingQueue<StreamsRecord> queue;
+    private long nextCheckpointTimeInNanos;
+    private String kinesisShardId;
+    
+    public StreamsRecordProcessor(LinkedBlockingQueue<StreamsRecord> queue, DynamoDBSourceConfig config) {
+        this.queue = queue;
+        this.checkpointInterval = config.getCheckpointInterval();
+        this.numRetries = config.getNumRetries();
+        this.backoffTime = config.getBackoffTime();
+    }
+
+    private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
+        log.info("Checkpointing shard " + kinesisShardId);
+        
+        for (int i = 0; i < numRetries; i++) {
+            try {
+                checkpointer.checkpoint();
+                break;
+            } catch (ShutdownException se) {
+                // Ignore checkpoint if the processor instance has been shutdown.
+                log.info("Caught shutdown exception, skipping checkpoint.", se);
+                break;
+            } catch (InvalidStateException e) {
+                log.error("Cannot save checkpoint to the DynamoDB table.", e);
+                break;
+            } catch (ThrottlingException | KinesisClientLibDependencyException e) {
+                // Back off and re-attempt checkpoint upon transient failures
+                if (i >= (numRetries - 1)) {
+                    log.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
+                    break;
+                }
+            }
+
+            try {
+                Thread.sleep(backoffTime);
+            } catch (InterruptedException e) {
+                log.debug("Interrupted sleep", e);
+            }
+        }
+    }
+
+    @Override
+    public void initialize(InitializationInput initializationInput) {
+        kinesisShardId = initializationInput.getShardId();
+    }
+
+    @Override
+    public void processRecords(ProcessRecordsInput processRecordsInput) {
+
+        log.info("Processing {} records from {}", processRecordsInput.getRecords().size(), kinesisShardId);

Review comment:
       I'm wondering how much logs it will produce 🤔 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] maths22 commented on a change in pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
maths22 commented on a change in pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#discussion_r419804568



##########
File path: pulsar-io/dynamodb/pom.xml
##########
@@ -0,0 +1,105 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-io</artifactId>
+    <version>2.6.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-io-dynamodb</artifactId>
+  <name>Pulsar IO :: DynamoDB</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-functions-instance</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-aws</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+    </dependency>
+
+    <!-- add cbor for kinesis-client to fix dep conflict -->
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-cbor</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+
+    <!-- dynamodb dependencies -->
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>dynamodb-streams-kinesis-adapter</artifactId>

Review comment:
       Yes (and IMO that makes it clear that the KCL version is determined by the kinesis adapter)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#issuecomment-630475202


   @addisonj @codelipenghui can you review this pull request again?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] maths22 commented on pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
maths22 commented on pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#issuecomment-624184445


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#issuecomment-626084222


   @gaoran10 Please also help review this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vzhikserg commented on a change in pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
vzhikserg commented on a change in pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#discussion_r419980781



##########
File path: pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java
##########
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.dynamodb;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Date;
+import java.util.Map;
+
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
+import com.amazonaws.services.dynamodbv2.*;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
+import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import software.amazon.awssdk.regions.Region;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode

Review comment:
       `@Data` already [contains ](https://projectlombok.org/features/Data)`@EqualsAndHashCode`

##########
File path: pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecordProcessor.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.dynamodb;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
+import com.amazonaws.services.kinesis.model.Record;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+@Slf4j
+// This is a direct adaptation of the kinesis record processor for kcl v1; no dynamo-specific logic
+public class StreamsRecordProcessor implements IRecordProcessor {
+
+    private final int numRetries;
+    private final long checkpointInterval;
+    private final long backoffTime;
+
+    private final LinkedBlockingQueue<StreamsRecord> queue;
+    private long nextCheckpointTimeInNanos;
+    private String kinesisShardId;
+    
+    public StreamsRecordProcessor(LinkedBlockingQueue<StreamsRecord> queue, DynamoDBSourceConfig config) {
+        this.queue = queue;
+        this.checkpointInterval = config.getCheckpointInterval();
+        this.numRetries = config.getNumRetries();
+        this.backoffTime = config.getBackoffTime();
+    }
+
+    private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
+        log.info("Checkpointing shard " + kinesisShardId);
+        
+        for (int i = 0; i < numRetries; i++) {
+            try {
+                checkpointer.checkpoint();
+                break;
+            } catch (ShutdownException se) {
+                // Ignore checkpoint if the processor instance has been shutdown.
+                log.info("Caught shutdown exception, skipping checkpoint.", se);
+                break;
+            } catch (InvalidStateException e) {
+                log.error("Cannot save checkpoint to the DynamoDB table.", e);
+                break;
+            } catch (ThrottlingException | KinesisClientLibDependencyException e) {
+                // Back off and re-attempt checkpoint upon transient failures
+                if (i >= (numRetries - 1)) {
+                    log.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
+                    break;
+                }
+            }
+
+            try {
+                Thread.sleep(backoffTime);
+            } catch (InterruptedException e) {
+                log.debug("Interrupted sleep", e);
+            }
+        }
+    }
+
+    @Override
+    public void initialize(InitializationInput initializationInput) {
+        kinesisShardId = initializationInput.getShardId();
+    }
+
+    @Override
+    public void processRecords(ProcessRecordsInput processRecordsInput) {
+
+        log.info("Processing " + processRecordsInput.getRecords().size() + " records from " + kinesisShardId);

Review comment:
       Please use template strings in logs:
   `log.info("Processing {} records from {}", processRecordsInput.getRecords().size(), kinesisShardId);`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#discussion_r426610247



##########
File path: site2/docs/io-kinesis-sink.md
##########
@@ -19,20 +19,20 @@ The configuration of the Kinesis sink connector has the following property.
 `awsEndpoint`|String|false|" " (empty string)|The Kinesis end-point URL, which can be found at [here](https://docs.aws.amazon.com/general/latest/gr/rande.html).
 `awsRegion`|String|false|" " (empty string)|The AWS region. <br/><br/>**Example**<br/> us-west-1, us-west-2
 `awsKinesisStreamName`|String|true|" " (empty string)|The Kinesis stream name.
-`awsCredentialPluginName`|String|false|" " (empty string)|The fully-qualified class name of implementation of {@inject: github:`AwsCredentialProviderPlugin`:/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java}. <br/><br/>It is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink. <br/><br/>If it is empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in `awsCredentialPluginParam`.
+`awsCredentialPluginName`|String|false|" " (empty string)|The fully-qualified class name of implementation of {@inject: github:`AwsCredentialProviderPlugin`:/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsCredentialProviderPlugin.java}. <br/><br/>It is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink. <br/><br/>If it is empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in `awsCredentialPluginParam`.

Review comment:
       As you described, it's better to keep backward compatibility.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vzhikserg commented on a change in pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
vzhikserg commented on a change in pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#discussion_r425407527



##########
File path: pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecord.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.dynamodb;
+
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter;
+import lombok.Getter;
+import org.apache.pulsar.functions.api.Record;
+import software.amazon.awssdk.utils.StringUtils;
+
+/**
+ *  This is a direct adaptation of the kinesis record for kcl v1,
+ *  with a little branching added for dynamo-specific logic.
+ */
+
+public class StreamsRecord implements Record<byte[]> {
+    
+    public static final String ARRIVAL_TIMESTAMP = "ARRIVAL_TIMESTAMP";
+    public static final String ENCRYPTION_TYPE = "ENCRYPTION_TYPE";
+    public static final String PARTITION_KEY = "PARTITION_KEY";
+    public static final String SEQUENCE_NUMBER = "SEQUENCE_NUMBER";
+    public static final String EVENT_NAME = "EVENT_NAME";
+
+    private static final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
+    @Getter

Review comment:
       It should be fine to put only one `@Getter` for the class instead of adding them to every field

##########
File path: pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecord.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.dynamodb;
+
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter;
+import lombok.Getter;
+import org.apache.pulsar.functions.api.Record;
+import software.amazon.awssdk.utils.StringUtils;
+
+/**
+ *  This is a direct adaptation of the kinesis record for kcl v1,
+ *  with a little branching added for dynamo-specific logic.
+ */
+
+public class StreamsRecord implements Record<byte[]> {
+    
+    public static final String ARRIVAL_TIMESTAMP = "ARRIVAL_TIMESTAMP";
+    public static final String ENCRYPTION_TYPE = "ENCRYPTION_TYPE";
+    public static final String PARTITION_KEY = "PARTITION_KEY";
+    public static final String SEQUENCE_NUMBER = "SEQUENCE_NUMBER";
+    public static final String EVENT_NAME = "EVENT_NAME";
+
+    private static final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
+    @Getter
+    private final Optional<String> key;
+    @Getter
+    private final byte[] value;
+    @Getter
+    private final HashMap<String, String> userProperties = new HashMap<String, String> ();

Review comment:
       This variable should be renamed to `properties` if they should be attached to the record. And the type can be changed to `Map<String, String>`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] maths22 commented on a change in pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
maths22 commented on a change in pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#discussion_r419804908



##########
File path: pulsar-io/aws/pom.xml
##########
@@ -0,0 +1,60 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-io</artifactId>
+    <version>2.6.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-io-aws</artifactId>
+  <name>Pulsar IO :: IO AWS</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+
+    <!-- aws dependencies -->
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-sts</artifactId>

Review comment:
       Sure, I can change that.  I just copy-pasted this from its old location.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] addisonj commented on pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
addisonj commented on pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#issuecomment-623750937


   few minor comments, overall looks sane to me


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] addisonj commented on a change in pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
addisonj commented on a change in pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#discussion_r419775903



##########
File path: pulsar-io/aws/pom.xml
##########
@@ -0,0 +1,60 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-io</artifactId>
+    <version>2.6.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-io-aws</artifactId>
+  <name>Pulsar IO :: IO AWS</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+
+    <!-- aws dependencies -->
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-sts</artifactId>

Review comment:
       can we use the `aws-sdk.version` defined in the top level `pom.xml` here? That may help prevent some version conflicts if someone else loads `aws-java-sdk-sts` but does use that variable




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] maths22 commented on pull request #6874: Add dynamodb streams source

Posted by GitBox <gi...@apache.org>.
maths22 commented on pull request #6874:
URL: https://github.com/apache/pulsar/pull/6874#issuecomment-624212268


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org