You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/24 01:32:36 UTC

[GitHub] [incubator-seatunnel] liugddx opened a new pull request, #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

liugddx opened a new pull request, #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166#discussion_r1011552120


##########
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
+
+import org.apache.seatunnel.connectors.seatunnel.common.config.CommonConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class AmazondynamodbSourceOptions implements Serializable {
+
+    private static final int DEFAULT_BATCH_SIZE = 25;
+    private static final int DEFAULT_BATCH_INTERVAL_MS = 1000;
+
+    private String url;
+
+    private String region;
+
+    private String accessKeyId;
+
+    private String secretAccessKey;
+
+    private String table;
+
+    private Config schema;
+
+    public int batchSize = DEFAULT_BATCH_SIZE;
+    public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
+
+    public AmazondynamodbSourceOptions(Config config) {
+        if (config.hasPath(AmazondynamodbConfig.URL)) {

Review Comment:
   You already check it exist, No need to check again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166#discussion_r1011354753


##########
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
+
+import org.apache.seatunnel.connectors.seatunnel.common.config.CommonConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class AmazondynamodbSourceOptions implements Serializable {
+
+    private static final int DEFAULT_BATCH_SIZE = 25;
+    private static final int DEFAULT_BATCH_INTERVAL_MS = 1000;
+
+    private String url;
+
+    private String region;
+
+    private String accessKeyId;
+
+    private String secretAccessKey;
+
+    private String table;
+
+    private Config schema;
+
+    public int batchSize = DEFAULT_BATCH_SIZE;
+    public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
+
+    public AmazondynamodbSourceOptions(Config config) {
+        if (config.hasPath(AmazondynamodbConfig.URL)) {

Review Comment:
   Seem like these config don't have default value, maybe you should use `CheckConfigUtil.checkAllExists` to make sure user didn't forget config it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166#discussion_r1002838578


##########
seatunnel-connectors-v2/connector-amazondynamodb/pom.xml:
##########
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>connector-amazondynamodb</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>

Review Comment:
   remove



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
liugddx commented on PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166#issuecomment-1288335083

   > add docs into this ?
   > 
   > https://github.com/apache/incubator-seatunnel/tree/dev/docs/en/connector-v2
   
   Thank you for your codereview,and this pr is still under development. I will fix all the problems.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on a diff in pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
liugddx commented on code in PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166#discussion_r1009317014


##########
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class AmazondynamodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private final AmazondynamodbSourceOptions amazondynamodbSourceOptions;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final DynamoDbClient dynamoDbClient;
+    private final List<AttributeValue.Type> measurementsType;
+
+    public AmazondynamodbWriter(AmazondynamodbSourceOptions amazondynamodbSourceOptions, SeaTunnelRowType seaTunnelRowType) {
+        this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
+        this.seaTunnelRowType = seaTunnelRowType;
+        dynamoDbClient = DynamoDbClient.builder()
+            .endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
+            // The region is meaningless for local DynamoDb but required for client builder validation
+            .region(Region.of(amazondynamodbSourceOptions.getRegion()))
+            .credentialsProvider(StaticCredentialsProvider.create(
+                AwsBasicCredentials.create(amazondynamodbSourceOptions.getAccessKeyId(), amazondynamodbSourceOptions.getSecretAccessKey())))
+            .build();
+        this.measurementsType = convertTypes(seaTunnelRowType);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+
+        dynamoDbClient.putItem(convertRow(element, seaTunnelRowType));

Review Comment:
   > Is putItem a synchronous write, can I use batch write to improve performance
   
   
   
   > Is putItem a synchronous write, can I use batch write to improve performance
   
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 closed pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
EricJoy2048 closed pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector
URL: https://github.com/apache/incubator-seatunnel/pull/3166


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
ic4y commented on code in PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166#discussion_r1009002311


##########
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class AmazondynamodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private final AmazondynamodbSourceOptions amazondynamodbSourceOptions;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final DynamoDbClient dynamoDbClient;
+    private final List<AttributeValue.Type> measurementsType;
+
+    public AmazondynamodbWriter(AmazondynamodbSourceOptions amazondynamodbSourceOptions, SeaTunnelRowType seaTunnelRowType) {
+        this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
+        this.seaTunnelRowType = seaTunnelRowType;
+        dynamoDbClient = DynamoDbClient.builder()
+            .endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
+            // The region is meaningless for local DynamoDb but required for client builder validation
+            .region(Region.of(amazondynamodbSourceOptions.getRegion()))
+            .credentialsProvider(StaticCredentialsProvider.create(
+                AwsBasicCredentials.create(amazondynamodbSourceOptions.getAccessKeyId(), amazondynamodbSourceOptions.getSecretAccessKey())))
+            .build();
+        this.measurementsType = convertTypes(seaTunnelRowType);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+
+        dynamoDbClient.putItem(convertRow(element, seaTunnelRowType));

Review Comment:
   Is putItem a synchronous write, can I use batch write to improve performance



##########
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class AmazondynamodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private final AmazondynamodbSourceOptions amazondynamodbSourceOptions;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final DynamoDbClient dynamoDbClient;

Review Comment:
   Does DynamoDbClient support serialization? Under different engines, the construction and write methods of the SinkWriter object may not be called in the same jvm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on a diff in pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
liugddx commented on code in PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166#discussion_r1010050782


##########
docs/en/connector-v2/sink/Amazondynamodb.md:
##########
@@ -0,0 +1,73 @@
+````
+# Amazondynamodb
+
+> Amazondynamodb sink connector
+
+## Description
+
+Write data to `Amazondynamodb`
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx closed pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
liugddx closed pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector
URL: https://github.com/apache/incubator-seatunnel/pull/3166


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
ic4y commented on code in PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166#discussion_r1011332545


##########
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowDeserializer;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowDeserializer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class DdynamoDbSinkClient {
+    private final AmazondynamodbSourceOptions amazondynamodbSourceOptions;
+    private ScheduledExecutorService scheduler;
+    private ScheduledFuture<?> scheduledFuture;
+    private volatile boolean initialize;
+    private volatile Exception flushException;
+    private DynamoDbClient dynamoDbClient;
+    private final List<WriteRequest> batchList;
+    protected SeaTunnelRowDeserializer seaTunnelRowDeserializer;
+
+    public DdynamoDbSinkClient(AmazondynamodbSourceOptions amazondynamodbSourceOptions, SeaTunnelRowType typeInfo) {
+        this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
+        this.batchList = new ArrayList<>();
+        this.seaTunnelRowDeserializer = new DefaultSeaTunnelRowDeserializer(typeInfo);
+    }
+
+    private void tryInit() throws IOException {
+        if (initialize) {
+            return;
+        }
+        dynamoDbClient = DynamoDbClient.builder()
+            .endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
+            // The region is meaningless for local DynamoDb but required for client builder validation
+            .region(Region.of(amazondynamodbSourceOptions.getRegion()))
+            .credentialsProvider(StaticCredentialsProvider.create(
+                AwsBasicCredentials.create(amazondynamodbSourceOptions.getAccessKeyId(), amazondynamodbSourceOptions.getSecretAccessKey())))
+            .build();
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder().setNameFormat("DdynamoDb-sink-output-%s").build());
+        scheduledFuture = scheduler.scheduleAtFixedRate(
+            () -> {
+                try {
+                    flush();
+                } catch (IOException e) {
+                    flushException = e;
+                }
+            },
+            amazondynamodbSourceOptions.getBatchIntervalMs(),
+            amazondynamodbSourceOptions.getBatchIntervalMs(),
+            TimeUnit.MILLISECONDS);
+
+        initialize = true;
+    }
+
+    public synchronized void write(PutItemRequest putItemRequest) throws IOException {
+        tryInit();
+        checkFlushException();
+        batchList.add(WriteRequest.builder().putRequest(
+            PutRequest.builder().item(putItemRequest.item()).build()).build());
+        if (amazondynamodbSourceOptions.getBatchSize() > 0
+            && batchList.size() >= amazondynamodbSourceOptions.getBatchSize()) {
+            flush();
+        }
+    }
+
+    public synchronized void close() throws IOException {
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(false);
+            scheduler.shutdown();
+        }
+        flush();

Review Comment:
   The close method will also be called when no data is written, and dynamoDbClient will have a null pointer exception at this time. This can be tested by a task where the source does not output any data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on a diff in pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
liugddx commented on code in PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166#discussion_r1011834145


##########
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
+
+import org.apache.seatunnel.connectors.seatunnel.common.config.CommonConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class AmazondynamodbSourceOptions implements Serializable {
+
+    private static final int DEFAULT_BATCH_SIZE = 25;
+    private static final int DEFAULT_BATCH_INTERVAL_MS = 1000;
+
+    private String url;
+
+    private String region;
+
+    private String accessKeyId;
+
+    private String secretAccessKey;
+
+    private String table;
+
+    private Config schema;
+
+    public int batchSize = DEFAULT_BATCH_SIZE;
+    public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
+
+    public AmazondynamodbSourceOptions(Config config) {
+        if (config.hasPath(AmazondynamodbConfig.URL)) {

Review Comment:
   > You already check it exist, No need to check again
   
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx closed pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
liugddx closed pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector
URL: https://github.com/apache/incubator-seatunnel/pull/3166


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
liugddx commented on PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166#issuecomment-1296434928

   PTAL @hailin0 @Hisoka-X @EricJoy2048 @ic4y @CalvinKirs thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
ic4y commented on code in PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166#discussion_r1010044406


##########
docs/en/connector-v2/sink/Amazondynamodb.md:
##########
@@ -0,0 +1,73 @@
+````
+# Amazondynamodb
+
+> Amazondynamodb sink connector
+
+## Description
+
+Write data to `Amazondynamodb`
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)

Review Comment:
   Sink Connector Features only contain `exactly-once` and `schema projection`.



##########
docs/en/connector-v2/sink/Amazondynamodb.md:
##########
@@ -0,0 +1,73 @@
+````
+# Amazondynamodb
+
+> Amazondynamodb sink connector
+
+## Description
+
+Write data to `Amazondynamodb`
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)

Review Comment:
   Ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
liugddx commented on PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166#issuecomment-1298027528

   #3018 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166#discussion_r1009112945


##########
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class AmazondynamodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private final AmazondynamodbSourceOptions amazondynamodbSourceOptions;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final DynamoDbClient dynamoDbClient;

Review Comment:
   SinkWriter will in same jvm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166#issuecomment-1299462953

   Don't worry, This is a bug about engine, fixed by #3216 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
ic4y commented on code in PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166#discussion_r1009113584


##########
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class AmazondynamodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private final AmazondynamodbSourceOptions amazondynamodbSourceOptions;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final DynamoDbClient dynamoDbClient;

Review Comment:
   My fault, sinkwriter doesn't need serialization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx closed pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
liugddx closed pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector
URL: https://github.com/apache/incubator-seatunnel/pull/3166


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 merged pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
EricJoy2048 merged PR #3166:
URL: https://github.com/apache/incubator-seatunnel/pull/3166


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx closed pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector

Posted by GitBox <gi...@apache.org>.
liugddx closed pull request #3166: [Feature][Connector-V2] [Amazondynamodb Connector]add amazondynamodb source & sink connnector
URL: https://github.com/apache/incubator-seatunnel/pull/3166


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org