You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/02 19:38:26 UTC

[GitHub] [flink-connector-dynamodb] dannycranmer commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector

dannycranmer commented on code in PR #1:
URL: https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1012223894


##########
flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkWriter.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.streaming.connectors.dynamodb.util.PrimaryKeyBuilder;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
+import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link DynamoDbSink} to write to DynamoDB. More details on the operation
+ * of this sink writer may be found in the doc for {@link DynamoDbSink}. More details on the
+ * internals of this sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * <p>The {@link DynamoDbAsyncClient} used here may be configured in the standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRequest> {
+    private static final Logger LOG = LoggerFactory.getLogger(DynamoDbSinkWriter.class);
+
+    private static final FatalExceptionClassifier RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER =
+            FatalExceptionClassifier.withRootCauseOfType(
+                    ResourceNotFoundException.class,
+                    err ->
+                            new DynamoDbSinkException(
+                                    "Encountered non-recoverable exception relating to not being able to find the specified resources",
+                                    err));
+
+    private static final FatalExceptionClassifier CONDITIONAL_CHECK_FAILED_EXCEPTION_CLASSIFIER =
+            FatalExceptionClassifier.withRootCauseOfType(
+                    ConditionalCheckFailedException.class,
+                    err ->
+                            new DynamoDbSinkException(
+                                    "Encountered non-recoverable exception relating to failed conditional check",
+                                    err));
+
+    /**
+     * Validation exceptions are not retryable. See
+     * https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html.
+     */
+    private static final FatalExceptionClassifier VALIDATION_EXCEPTION_CLASSIFIER =
+            new FatalExceptionClassifier(
+                    (err) ->
+                            err instanceof DynamoDbException
+                                    && ((DynamoDbException) err)
+                                            .toBuilder()
+                                            .awsErrorDetails()
+                                            .errorCode()
+                                            .equalsIgnoreCase("ValidationException"),
+                    err ->
+                            new DynamoDbSinkException(
+                                    "Encountered non-recoverable exception because of DynamoDB request validation",
+                                    err));
+
+    private static final FatalExceptionClassifier DYNAMODB_FATAL_EXCEPTION_CLASSIFIER =
+            FatalExceptionClassifier.createChain(
+                    getInterruptedExceptionClassifier(),
+                    getInvalidCredentialsExceptionClassifier(),
+                    RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER,
+                    CONDITIONAL_CHECK_FAILED_EXCEPTION_CLASSIFIER,
+                    VALIDATION_EXCEPTION_CLASSIFIER,
+                    getSdkClientMisconfiguredExceptionClassifier());
+
+    /* A counter for the total number of records that have encountered an error during put */
+    private final Counter numRecordsSendErrorsCounter;
+
+    /* A counter for the total number of records that were returned by DynamoDB as unprocessed*/
+    private final Counter numRecordsReturnedAsUnprocessedCounter;
+
+    /* The sink writer metric group */
+    private final SinkWriterMetricGroup metrics;
+
+    /* The asynchronous http client for the asynchronous DynamoDb client */
+    private final SdkAsyncHttpClient httpClient;
+    private final DynamoDbAsyncClient client;
+    private final boolean failOnError;
+    private final String tableName;
+
+    private final List<String> overwriteByPartitionKeys;
+
+    public DynamoDbSinkWriter(
+            ElementConverter<InputT, DynamoDbWriteRequest> elementConverter,
+            InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            boolean failOnError,
+            String tableName,
+            List<String> overwriteByPartitionKeys,
+            Properties dynamoDbClientProperties,
+            Collection<BufferedRequestState<DynamoDbWriteRequest>> states) {
+        super(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes,
+                states);
+        this.failOnError = failOnError;
+        this.tableName = tableName;
+        this.overwriteByPartitionKeys = overwriteByPartitionKeys;
+        this.metrics = context.metricGroup();
+        this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();
+        this.numRecordsReturnedAsUnprocessedCounter =
+                metrics.counter("numRecordsReturnedAsUnprocessedCounter");
+        this.httpClient = AWSGeneralUtil.createAsyncHttpClient(dynamoDbClientProperties);
+        this.client =
+                AWSAsyncSinkUtil.createAwsAsyncClient(
+                        dynamoDbClientProperties,
+                        httpClient,
+                        DynamoDbAsyncClient.builder(),
+                        DynamoDbConfigConstants.BASE_DYNAMODB_USER_AGENT_PREFIX_FORMAT,
+                        DynamoDbConfigConstants.DYNAMODB_CLIENT_USER_AGENT_PREFIX);
+    }
+
+    @Override
+    protected void submitRequestEntries(
+            List<DynamoDbWriteRequest> requestEntries,
+            Consumer<List<DynamoDbWriteRequest>> requestResultConsumer) {
+
+        List<WriteRequest> items = new ArrayList<>();
+
+        if (CollectionUtil.isNullOrEmpty(overwriteByPartitionKeys)) {
+            for (DynamoDbWriteRequest request : requestEntries) {
+                items.add(convertToWriteRequest(request));
+            }
+        } else {
+            // deduplication needed
+            Map<String, WriteRequest> container = new HashMap<>();
+            PrimaryKeyBuilder keyBuilder = new PrimaryKeyBuilder(overwriteByPartitionKeys);
+            for (DynamoDbWriteRequest request : requestEntries) {
+                WriteRequest req = convertToWriteRequest(request);
+                container.put(keyBuilder.build(req), req);
+            }
+            items.addAll(container.values());
+        }
+
+        CompletableFuture<BatchWriteItemResponse> future =
+                client.batchWriteItem(
+                        BatchWriteItemRequest.builder()
+                                .requestItems(ImmutableMap.of(tableName, items))
+                                .build());
+
+        future.whenComplete(
+                (response, err) -> {
+                    if (err != null) {
+                        handleFullyFailedRequest(err, requestEntries, requestResultConsumer);
+                    } else if (!CollectionUtil.isNullOrEmpty(response.unprocessedItems())) {
+                        handlePartiallyUnprocessedRequest(response, requestResultConsumer);
+                    } else {
+                        requestResultConsumer.accept(Collections.emptyList());
+                    }
+                });
+    }
+
+    private void handlePartiallyUnprocessedRequest(
+            BatchWriteItemResponse response, Consumer<List<DynamoDbWriteRequest>> requestResult) {
+        List<DynamoDbWriteRequest> unprocessed = new ArrayList<>();
+
+        for (String tableName : response.unprocessedItems().keySet()) {
+            for (WriteRequest writeRequest : response.unprocessedItems().get(tableName)) {
+                unprocessed.add(convertToDynamoDbWriteRequest(writeRequest));
+            }
+        }
+
+        LOG.warn("DynamoDB Sink failed to persist and will retry {} entries.", unprocessed.size());
+        numRecordsReturnedAsUnprocessedCounter.inc(unprocessed.size());
+
+        requestResult.accept(unprocessed);
+    }
+
+    private void handleFullyFailedRequest(
+            Throwable err,
+            List<DynamoDbWriteRequest> requestEntries,
+            Consumer<List<DynamoDbWriteRequest>> requestResult) {
+        LOG.warn(
+                "DynamoDB Sink failed to persist and will retry {} entries.",
+                requestEntries.size(),
+                err);
+        numRecordsSendErrorsCounter.inc(requestEntries.size());
+
+        if (isRetryable(err.getCause())) {
+            requestResult.accept(requestEntries);
+        }
+    }
+
+    private boolean isRetryable(Throwable err) {
+        if (!DYNAMODB_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, getFatalExceptionCons())) {
+            return false;
+        }
+        if (failOnError) {
+            getFatalExceptionCons()
+                    .accept(new DynamoDbSinkException.DynamoDbSinkFailFastException(err));
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    protected long getSizeInBytes(DynamoDbWriteRequest requestEntry) {
+        // dynamodb calculates item size as a sum of all attributes and all values, but doing so on
+        // every operation may be too expensive, so this is just an estimate
+        return requestEntry.getItem().toString().getBytes(StandardCharsets.UTF_8).length;

Review Comment:
   As discussed offline, it is not practical to compute the `sizeInBytes` given that:
   - We do not know how the client serializes the request payload
   - This will be an expensive operation and we would potentially be serializing each record twice
   
   We agree to:
   - Return a dummy constant value here
   - Follow up with an improvement to make this flush strategy and validation optional
   
   The failure modes are already covered:
   - Total batch size too large: this will be handled organically by AIMD
   - Single `DynamoDbWriteRequest` too large: if we try to send a single request that is too large it will result in a fatal validation exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org