You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/22 18:25:01 UTC

[GitHub] [flink-connector-aws] darenwkt opened a new pull request, #42: [FLINK-30229] Add SQL IT case for DynamoDB Sink

darenwkt opened a new pull request, #42:
URL: https://github.com/apache/flink-connector-aws/pull/42

   Added integration test against DynamoDBLocal running in Docker with dummy data generated from datagen SQL connector.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] vahmed-hamdy commented on a diff in pull request #42: [FLINK-30229] Add SQL IT case for DynamoDB Sink

Posted by GitBox <gi...@apache.org>.
vahmed-hamdy commented on code in PR #42:
URL: https://github.com/apache/flink-connector-aws/pull/42#discussion_r1071243445


##########
flink-connector-dynamodb/pom.xml:
##########
@@ -132,5 +132,12 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-loader</artifactId>

Review Comment:
   I am happy to follow this approach if we intend to have E2E tests, they need not to be against an actual DDB (this is preferable ofc) but we need to verify that the connector works when packaging the uber jar to sql client.
   
   KDS tests for reference
   https://github.com/apache/flink-connector-aws/blob/2a24e7c1e8d2a8f613838fd0419177d4f96991bf/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml#L99



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] darenwkt commented on a diff in pull request #42: [FLINK-30229] Add SQL IT case for DynamoDB Sink

Posted by GitBox <gi...@apache.org>.
darenwkt commented on code in PR #42:
URL: https://github.com/apache/flink-connector-aws/pull/42#discussion_r1071178637


##########
flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkITCase.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDBHelpers;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDbContainer;
+import org.apache.flink.connector.dynamodb.util.DockerImageVersions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URISyntaxException;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/** Integration test for {@link org.apache.flink.connector.dynamodb.table.DynamoDbDynamicSink}. */
+@Testcontainers
+@ExtendWith(MiniClusterExtension.class)
+public class DynamoDbDynamicSinkITCase {
+    private static final String PARTITION_KEY = "partition_key";
+    private static final String SORT_KEY = "sort_key";
+    private static DynamoDBHelpers dynamoDBHelpers;
+    private static String testTableName;
+    private static StreamExecutionEnvironment env;
+
+    // shared between test methods
+    @Container
+    public static final DynamoDbContainer LOCALSTACK =
+            new DynamoDbContainer(DockerImageName.parse(DockerImageVersions.DYNAMODB))
+                    .withCommand("-jar DynamoDBLocal.jar -inMemory -sharedDb")
+                    .withNetwork(Network.newNetwork())
+                    .withNetworkAliases("dynamodb");
+
+    @BeforeEach
+    public void setup() throws URISyntaxException {
+        testTableName = UUID.randomUUID().toString();
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(1);
+
+        dynamoDBHelpers = new DynamoDBHelpers(LOCALSTACK.getHostClient());
+    }
+
+    @Test
+    public void testSQLSink() throws ExecutionException, InterruptedException {
+
+        int expectedNumOfElements = 50;
+
+        dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+        StreamTableEnvironment streamTableEnvironment =
+                StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
+
+        final String createTableStmt = getCreateTableStmt();
+        streamTableEnvironment.executeSql(createTableStmt);
+
+        final String datagenStmt = getDatagenStmt(expectedNumOfElements);
+        streamTableEnvironment.executeSql(datagenStmt);
+
+        final String insertSql = "INSERT INTO dynamo_db_table SELECT * from datagen;";
+        streamTableEnvironment.executeSql(insertSql).await();
+
+        Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName))
+                .isEqualTo(expectedNumOfElements);
+    }
+
+    @Test
+    public void testTableAPISink() throws ExecutionException, InterruptedException {
+
+        int expectedNumOfElements = 50;
+
+        dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+        StreamTableEnvironment streamTableEnvironment =
+                StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
+
+        final String createTableStmt = getCreateTableStmt();
+        streamTableEnvironment.executeSql(createTableStmt);
+
+        final String datagenStmt = getDatagenStmt(expectedNumOfElements);
+        streamTableEnvironment.executeSql(datagenStmt);
+
+        Table resultTable = streamTableEnvironment.sqlQuery("SELECT * FROM datagen");
+
+        resultTable.executeInsert("dynamo_db_table").await();
+
+        Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName))
+                .isEqualTo(expectedNumOfElements);
+    }
+
+    private static String getDatagenStmt(int expectedNumOfElements) {
+        return "CREATE TEMPORARY TABLE datagen\n"
+                + "WITH (\n"
+                + "    'connector' = 'datagen',\n"
+                + "    'number-of-rows' = '"
+                + expectedNumOfElements
+                + "'\n"
+                + ")\n"
+                + "LIKE dynamo_db_table (EXCLUDING ALL);";
+    }
+
+    private static String getCreateTableStmt() {

Review Comment:
   Agreed, will move them to test resource file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] darenwkt commented on a diff in pull request #42: [FLINK-30229] Add SQL IT case for DynamoDB Sink

Posted by GitBox <gi...@apache.org>.
darenwkt commented on code in PR #42:
URL: https://github.com/apache/flink-connector-aws/pull/42#discussion_r1071179131


##########
flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkITCase.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDBHelpers;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDbContainer;
+import org.apache.flink.connector.dynamodb.util.DockerImageVersions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URISyntaxException;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/** Integration test for {@link org.apache.flink.connector.dynamodb.table.DynamoDbDynamicSink}. */
+@Testcontainers
+@ExtendWith(MiniClusterExtension.class)
+public class DynamoDbDynamicSinkITCase {
+    private static final String PARTITION_KEY = "partition_key";
+    private static final String SORT_KEY = "sort_key";
+    private static DynamoDBHelpers dynamoDBHelpers;
+    private static String testTableName;
+    private static StreamExecutionEnvironment env;
+
+    // shared between test methods
+    @Container
+    public static final DynamoDbContainer LOCALSTACK =
+            new DynamoDbContainer(DockerImageName.parse(DockerImageVersions.DYNAMODB))
+                    .withCommand("-jar DynamoDBLocal.jar -inMemory -sharedDb")
+                    .withNetwork(Network.newNetwork())
+                    .withNetworkAliases("dynamodb");
+
+    @BeforeEach
+    public void setup() throws URISyntaxException {
+        testTableName = UUID.randomUUID().toString();
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(1);
+
+        dynamoDBHelpers = new DynamoDBHelpers(LOCALSTACK.getHostClient());
+    }
+
+    @Test
+    public void testSQLSink() throws ExecutionException, InterruptedException {
+
+        int expectedNumOfElements = 50;
+
+        dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+        StreamTableEnvironment streamTableEnvironment =
+                StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
+
+        final String createTableStmt = getCreateTableStmt();
+        streamTableEnvironment.executeSql(createTableStmt);
+
+        final String datagenStmt = getDatagenStmt(expectedNumOfElements);
+        streamTableEnvironment.executeSql(datagenStmt);
+
+        final String insertSql = "INSERT INTO dynamo_db_table SELECT * from datagen;";
+        streamTableEnvironment.executeSql(insertSql).await();
+
+        Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName))
+                .isEqualTo(expectedNumOfElements);
+    }
+
+    @Test
+    public void testTableAPISink() throws ExecutionException, InterruptedException {
+
+        int expectedNumOfElements = 50;
+
+        dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+        StreamTableEnvironment streamTableEnvironment =
+                StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
+
+        final String createTableStmt = getCreateTableStmt();
+        streamTableEnvironment.executeSql(createTableStmt);
+
+        final String datagenStmt = getDatagenStmt(expectedNumOfElements);
+        streamTableEnvironment.executeSql(datagenStmt);

Review Comment:
   Agreed, will update to use TableAPI and TableEnvironment for this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] vahmed-hamdy commented on a diff in pull request #42: [FLINK-30229] Add SQL IT case for DynamoDB Sink

Posted by GitBox <gi...@apache.org>.
vahmed-hamdy commented on code in PR #42:
URL: https://github.com/apache/flink-connector-aws/pull/42#discussion_r1066881751


##########
flink-connector-dynamodb/pom.xml:
##########
@@ -132,5 +132,12 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-loader</artifactId>

Review Comment:
   previously we had separate modules for these IT tests, they were more of E2E tests for uber sql connector. since we are not doing this with this approach are we intending to have another E2E test?



##########
flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkITCase.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDBHelpers;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDbContainer;
+import org.apache.flink.connector.dynamodb.util.DockerImageVersions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URISyntaxException;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/** Integration test for {@link org.apache.flink.connector.dynamodb.table.DynamoDbDynamicSink}. */
+@Testcontainers
+@ExtendWith(MiniClusterExtension.class)
+public class DynamoDbDynamicSinkITCase {
+    private static final String PARTITION_KEY = "partition_key";
+    private static final String SORT_KEY = "sort_key";
+    private static DynamoDBHelpers dynamoDBHelpers;
+    private static String testTableName;
+    private static StreamExecutionEnvironment env;
+
+    // shared between test methods
+    @Container
+    public static final DynamoDbContainer LOCALSTACK =
+            new DynamoDbContainer(DockerImageName.parse(DockerImageVersions.DYNAMODB))
+                    .withCommand("-jar DynamoDBLocal.jar -inMemory -sharedDb")
+                    .withNetwork(Network.newNetwork())
+                    .withNetworkAliases("dynamodb");
+
+    @BeforeEach
+    public void setup() throws URISyntaxException {
+        testTableName = UUID.randomUUID().toString();
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(1);
+
+        dynamoDBHelpers = new DynamoDBHelpers(LOCALSTACK.getHostClient());
+    }
+
+    @Test
+    public void testSQLSink() throws ExecutionException, InterruptedException {
+
+        int expectedNumOfElements = 50;
+
+        dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+        StreamTableEnvironment streamTableEnvironment =
+                StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
+
+        final String createTableStmt = getCreateTableStmt();
+        streamTableEnvironment.executeSql(createTableStmt);
+
+        final String datagenStmt = getDatagenStmt(expectedNumOfElements);
+        streamTableEnvironment.executeSql(datagenStmt);
+
+        final String insertSql = "INSERT INTO dynamo_db_table SELECT * from datagen;";
+        streamTableEnvironment.executeSql(insertSql).await();
+
+        Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName))
+                .isEqualTo(expectedNumOfElements);
+    }
+
+    @Test
+    public void testTableAPISink() throws ExecutionException, InterruptedException {
+
+        int expectedNumOfElements = 50;
+
+        dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+        StreamTableEnvironment streamTableEnvironment =
+                StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
+
+        final String createTableStmt = getCreateTableStmt();
+        streamTableEnvironment.executeSql(createTableStmt);
+
+        final String datagenStmt = getDatagenStmt(expectedNumOfElements);
+        streamTableEnvironment.executeSql(datagenStmt);
+
+        Table resultTable = streamTableEnvironment.sqlQuery("SELECT * FROM datagen");
+
+        resultTable.executeInsert("dynamo_db_table").await();
+
+        Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName))
+                .isEqualTo(expectedNumOfElements);

Review Comment:
   Refering to Firehose test, we used to see intermittent failures due to delays between sending records downstream and observing them from LocalStack, is this not needed for DDB?



##########
flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkITCase.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDBHelpers;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDbContainer;
+import org.apache.flink.connector.dynamodb.util.DockerImageVersions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URISyntaxException;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/** Integration test for {@link org.apache.flink.connector.dynamodb.table.DynamoDbDynamicSink}. */
+@Testcontainers
+@ExtendWith(MiniClusterExtension.class)
+public class DynamoDbDynamicSinkITCase {
+    private static final String PARTITION_KEY = "partition_key";
+    private static final String SORT_KEY = "sort_key";
+    private static DynamoDBHelpers dynamoDBHelpers;
+    private static String testTableName;
+    private static StreamExecutionEnvironment env;
+
+    // shared between test methods
+    @Container
+    public static final DynamoDbContainer LOCALSTACK =
+            new DynamoDbContainer(DockerImageName.parse(DockerImageVersions.DYNAMODB))
+                    .withCommand("-jar DynamoDBLocal.jar -inMemory -sharedDb")
+                    .withNetwork(Network.newNetwork())
+                    .withNetworkAliases("dynamodb");
+
+    @BeforeEach
+    public void setup() throws URISyntaxException {
+        testTableName = UUID.randomUUID().toString();
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(1);
+
+        dynamoDBHelpers = new DynamoDBHelpers(LOCALSTACK.getHostClient());
+    }
+
+    @Test
+    public void testSQLSink() throws ExecutionException, InterruptedException {
+
+        int expectedNumOfElements = 50;
+
+        dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+        StreamTableEnvironment streamTableEnvironment =
+                StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
+
+        final String createTableStmt = getCreateTableStmt();
+        streamTableEnvironment.executeSql(createTableStmt);
+
+        final String datagenStmt = getDatagenStmt(expectedNumOfElements);
+        streamTableEnvironment.executeSql(datagenStmt);
+
+        final String insertSql = "INSERT INTO dynamo_db_table SELECT * from datagen;";
+        streamTableEnvironment.executeSql(insertSql).await();
+
+        Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName))
+                .isEqualTo(expectedNumOfElements);
+    }
+
+    @Test
+    public void testTableAPISink() throws ExecutionException, InterruptedException {
+
+        int expectedNumOfElements = 50;
+
+        dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+        StreamTableEnvironment streamTableEnvironment =
+                StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
+
+        final String createTableStmt = getCreateTableStmt();
+        streamTableEnvironment.executeSql(createTableStmt);
+
+        final String datagenStmt = getDatagenStmt(expectedNumOfElements);
+        streamTableEnvironment.executeSql(datagenStmt);
+
+        Table resultTable = streamTableEnvironment.sqlQuery("SELECT * FROM datagen");
+
+        resultTable.executeInsert("dynamo_db_table").await();
+
+        Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName))
+                .isEqualTo(expectedNumOfElements);
+    }
+
+    private static String getDatagenStmt(int expectedNumOfElements) {
+        return "CREATE TEMPORARY TABLE datagen\n"
+                + "WITH (\n"
+                + "    'connector' = 'datagen',\n"
+                + "    'number-of-rows' = '"
+                + expectedNumOfElements
+                + "'\n"
+                + ")\n"
+                + "LIKE dynamo_db_table (EXCLUDING ALL);";
+    }
+
+    private static String getCreateTableStmt() {

Review Comment:
   +1 for resource usage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #42: [FLINK-30229] Add SQL IT case for DynamoDB Sink

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #42:
URL: https://github.com/apache/flink-connector-aws/pull/42#discussion_r1066842600


##########
flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkITCase.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDBHelpers;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDbContainer;
+import org.apache.flink.connector.dynamodb.util.DockerImageVersions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URISyntaxException;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/** Integration test for {@link org.apache.flink.connector.dynamodb.table.DynamoDbDynamicSink}. */
+@Testcontainers
+@ExtendWith(MiniClusterExtension.class)
+public class DynamoDbDynamicSinkITCase {
+    private static final String PARTITION_KEY = "partition_key";
+    private static final String SORT_KEY = "sort_key";
+    private static DynamoDBHelpers dynamoDBHelpers;
+    private static String testTableName;
+    private static StreamExecutionEnvironment env;
+
+    // shared between test methods
+    @Container
+    public static final DynamoDbContainer LOCALSTACK =
+            new DynamoDbContainer(DockerImageName.parse(DockerImageVersions.DYNAMODB))
+                    .withCommand("-jar DynamoDBLocal.jar -inMemory -sharedDb")
+                    .withNetwork(Network.newNetwork())
+                    .withNetworkAliases("dynamodb");
+
+    @BeforeEach
+    public void setup() throws URISyntaxException {
+        testTableName = UUID.randomUUID().toString();
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(1);
+
+        dynamoDBHelpers = new DynamoDBHelpers(LOCALSTACK.getHostClient());
+    }
+
+    @Test
+    public void testSQLSink() throws ExecutionException, InterruptedException {
+
+        int expectedNumOfElements = 50;
+
+        dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+        StreamTableEnvironment streamTableEnvironment =
+                StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
+
+        final String createTableStmt = getCreateTableStmt();
+        streamTableEnvironment.executeSql(createTableStmt);
+
+        final String datagenStmt = getDatagenStmt(expectedNumOfElements);
+        streamTableEnvironment.executeSql(datagenStmt);
+
+        final String insertSql = "INSERT INTO dynamo_db_table SELECT * from datagen;";
+        streamTableEnvironment.executeSql(insertSql).await();
+
+        Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName))
+                .isEqualTo(expectedNumOfElements);
+    }
+
+    @Test
+    public void testTableAPISink() throws ExecutionException, InterruptedException {
+
+        int expectedNumOfElements = 50;
+
+        dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+        StreamTableEnvironment streamTableEnvironment =
+                StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
+
+        final String createTableStmt = getCreateTableStmt();
+        streamTableEnvironment.executeSql(createTableStmt);
+
+        final String datagenStmt = getDatagenStmt(expectedNumOfElements);
+        streamTableEnvironment.executeSql(datagenStmt);
+
+        Table resultTable = streamTableEnvironment.sqlQuery("SELECT * FROM datagen");
+
+        resultTable.executeInsert("dynamo_db_table").await();
+
+        Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName))
+                .isEqualTo(expectedNumOfElements);
+    }
+
+    private static String getDatagenStmt(int expectedNumOfElements) {
+        return "CREATE TEMPORARY TABLE datagen\n"
+                + "WITH (\n"
+                + "    'connector' = 'datagen',\n"
+                + "    'number-of-rows' = '"
+                + expectedNumOfElements
+                + "'\n"
+                + ")\n"
+                + "LIKE dynamo_db_table (EXCLUDING ALL);";
+    }
+
+    private static String getCreateTableStmt() {

Review Comment:
   Q: can we move this to a java resource? same for the other statements



##########
flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkITCase.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDBHelpers;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDbContainer;
+import org.apache.flink.connector.dynamodb.util.DockerImageVersions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URISyntaxException;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/** Integration test for {@link org.apache.flink.connector.dynamodb.table.DynamoDbDynamicSink}. */
+@Testcontainers
+@ExtendWith(MiniClusterExtension.class)
+public class DynamoDbDynamicSinkITCase {
+    private static final String PARTITION_KEY = "partition_key";
+    private static final String SORT_KEY = "sort_key";
+    private static DynamoDBHelpers dynamoDBHelpers;
+    private static String testTableName;
+    private static StreamExecutionEnvironment env;
+
+    // shared between test methods
+    @Container
+    public static final DynamoDbContainer LOCALSTACK =
+            new DynamoDbContainer(DockerImageName.parse(DockerImageVersions.DYNAMODB))
+                    .withCommand("-jar DynamoDBLocal.jar -inMemory -sharedDb")
+                    .withNetwork(Network.newNetwork())
+                    .withNetworkAliases("dynamodb");
+
+    @BeforeEach
+    public void setup() throws URISyntaxException {
+        testTableName = UUID.randomUUID().toString();
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(1);
+
+        dynamoDBHelpers = new DynamoDBHelpers(LOCALSTACK.getHostClient());
+    }
+
+    @Test
+    public void testSQLSink() throws ExecutionException, InterruptedException {
+
+        int expectedNumOfElements = 50;
+
+        dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+        StreamTableEnvironment streamTableEnvironment =
+                StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
+
+        final String createTableStmt = getCreateTableStmt();
+        streamTableEnvironment.executeSql(createTableStmt);
+
+        final String datagenStmt = getDatagenStmt(expectedNumOfElements);
+        streamTableEnvironment.executeSql(datagenStmt);
+
+        final String insertSql = "INSERT INTO dynamo_db_table SELECT * from datagen;";
+        streamTableEnvironment.executeSql(insertSql).await();
+
+        Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName))
+                .isEqualTo(expectedNumOfElements);
+    }
+
+    @Test
+    public void testTableAPISink() throws ExecutionException, InterruptedException {
+
+        int expectedNumOfElements = 50;
+
+        dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+        StreamTableEnvironment streamTableEnvironment =
+                StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
+
+        final String createTableStmt = getCreateTableStmt();
+        streamTableEnvironment.executeSql(createTableStmt);
+
+        final String datagenStmt = getDatagenStmt(expectedNumOfElements);
+        streamTableEnvironment.executeSql(datagenStmt);

Review Comment:
   can we also use Table API for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] darenwkt commented on a diff in pull request #42: [FLINK-30229] Add SQL IT case for DynamoDB Sink

Posted by GitBox <gi...@apache.org>.
darenwkt commented on code in PR #42:
URL: https://github.com/apache/flink-connector-aws/pull/42#discussion_r1071181578


##########
flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkITCase.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDBHelpers;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDbContainer;
+import org.apache.flink.connector.dynamodb.util.DockerImageVersions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URISyntaxException;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/** Integration test for {@link org.apache.flink.connector.dynamodb.table.DynamoDbDynamicSink}. */
+@Testcontainers
+@ExtendWith(MiniClusterExtension.class)
+public class DynamoDbDynamicSinkITCase {
+    private static final String PARTITION_KEY = "partition_key";
+    private static final String SORT_KEY = "sort_key";
+    private static DynamoDBHelpers dynamoDBHelpers;
+    private static String testTableName;
+    private static StreamExecutionEnvironment env;
+
+    // shared between test methods
+    @Container
+    public static final DynamoDbContainer LOCALSTACK =
+            new DynamoDbContainer(DockerImageName.parse(DockerImageVersions.DYNAMODB))
+                    .withCommand("-jar DynamoDBLocal.jar -inMemory -sharedDb")
+                    .withNetwork(Network.newNetwork())
+                    .withNetworkAliases("dynamodb");
+
+    @BeforeEach
+    public void setup() throws URISyntaxException {
+        testTableName = UUID.randomUUID().toString();
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(1);
+
+        dynamoDBHelpers = new DynamoDBHelpers(LOCALSTACK.getHostClient());
+    }
+
+    @Test
+    public void testSQLSink() throws ExecutionException, InterruptedException {
+
+        int expectedNumOfElements = 50;
+
+        dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+        StreamTableEnvironment streamTableEnvironment =
+                StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
+
+        final String createTableStmt = getCreateTableStmt();
+        streamTableEnvironment.executeSql(createTableStmt);
+
+        final String datagenStmt = getDatagenStmt(expectedNumOfElements);
+        streamTableEnvironment.executeSql(datagenStmt);
+
+        final String insertSql = "INSERT INTO dynamo_db_table SELECT * from datagen;";
+        streamTableEnvironment.executeSql(insertSql).await();
+
+        Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName))
+                .isEqualTo(expectedNumOfElements);
+    }
+
+    @Test
+    public void testTableAPISink() throws ExecutionException, InterruptedException {
+
+        int expectedNumOfElements = 50;
+
+        dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+        StreamTableEnvironment streamTableEnvironment =
+                StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
+
+        final String createTableStmt = getCreateTableStmt();
+        streamTableEnvironment.executeSql(createTableStmt);
+
+        final String datagenStmt = getDatagenStmt(expectedNumOfElements);
+        streamTableEnvironment.executeSql(datagenStmt);
+
+        Table resultTable = streamTableEnvironment.sqlQuery("SELECT * FROM datagen");
+
+        resultTable.executeInsert("dynamo_db_table").await();
+
+        Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName))
+                .isEqualTo(expectedNumOfElements);

Review Comment:
   The line before is a synchronous operation "resultTable.executeInsert("dynamo_db_table").await();" which should wait until all records are successfully sent. So the delay shouldn't cause us failures here unless the await timeouts



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] darenwkt commented on a diff in pull request #42: [FLINK-30229] Add SQL IT case for DynamoDB Sink

Posted by GitBox <gi...@apache.org>.
darenwkt commented on code in PR #42:
URL: https://github.com/apache/flink-connector-aws/pull/42#discussion_r1071180326


##########
flink-connector-dynamodb/pom.xml:
##########
@@ -132,5 +132,12 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-loader</artifactId>

Review Comment:
   Yes, I was thinking that E2E test will be testing against an actual DynamoDB whereas this IT case is testing against a local DynamoDB container.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] dannycranmer merged pull request #42: [FLINK-30229] Add SQL IT case for DynamoDB Sink

Posted by "dannycranmer (via GitHub)" <gi...@apache.org>.
dannycranmer merged PR #42:
URL: https://github.com/apache/flink-connector-aws/pull/42


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org