You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Ahmed Hamdy (Jira)" <ji...@apache.org> on 2024/04/06 15:45:00 UTC

[jira] [Created] (FLINK-35022) Add TypeInformed Element Converter for DynamoDbSink

Ahmed Hamdy created FLINK-35022:
-----------------------------------

             Summary: Add TypeInformed Element Converter for DynamoDbSink
                 Key: FLINK-35022
                 URL: https://issues.apache.org/jira/browse/FLINK-35022
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / DynamoDB
    Affects Versions: aws-connector-4.3.0
            Reporter: Ahmed Hamdy


h2. Context
{{DynamoDbSink}} as an extentsion of {{AsyncSinkBase}} depends on {{org.apache.flink.connector.base.sink.writer.ElementConverter}} to convert Flink stream objects to DynamoDb write requests, where item is represented as {{Map<String, AttributeValue[1]>}}.

{{AttributeValue}} is the wrapper for the DynamoDb comprehendable Object in a format similar with type identification properties as in
{M": {"Name" : {"S": Joe }, "Age" : {"N": 35 }}}.

Since TypeInformation is already natively supported in Flink, many implementations of the DynamoDb ElementConverted is just a boiler plate. 
For example 
{code:title="Simple POJO Element Conversion"}
 public class Order {
        String id;
        int quantity;
        double total;
}
{code}

The implementation of the converter must be 

{code:title="Simple POJO DDB Element Converter"}
public static class SimplePojoElementConverter implements ElementConverter<Order, DynamoDbWriteRequest> {

        @Override
        public DynamoDbWriteRequest apply(Order order, SinkWriter.Context context) {
            Map<String, AttributeValue> itemMap = new HashMap<>();
            itemMap.put("id", AttributeValue.builder().s(order.id).build());
            itemMap.put("quantity", AttributeValue.builder().n(String.valueOf(order.quantity)).build());
            itemMap.put("total", AttributeValue.builder().n(String.valueOf(order.total)).build());
            return DynamoDbWriteRequest.builder()
                    .setType(DynamoDbWriteRequestType.PUT)
                    .setItem(itemMap)
                    .build();
        }

        @Override
        public void open(Sink.InitContext context) {
            
        }
    }
{code}

while this might not be too much of work, however it is a fairly common case in Flink and this implementation requires some fair knowledge of DDB model for new users.

h2. Proposal 

Introduce {{ DynamoDbTypeInformedElementConverter}} as follows:

{code:title="TypeInformedElementconverter"} 
public class DynamoDbTypeInformedElementConverter<inputT> implements ElementConverter<inputT, DynamoDbWriteRequest> {
DynamoDbTypeInformedElementConverter(CompositeType<inputT> typeInfo);
    public DynamoDbWriteRequest convertElement(input) {
    switch this.typeInfo{
        case: BasicTypeInfo.STRING_TYPE_INFO: return input -> AttributeValue.fromS(o.toString())
        case: BasicTypeInfo.SHORT_TYPE_INFO: 
        case: BasicTypeInfo.INTEGER_TYPE_INFO: input -> AttributeValue.fromN(o.toString())
       case: TupleTypeInfo: input -> AttributeValue.fromL(converTuple(input))
      .....
    }
}
}

// User Code
public static void main(String []args) {
  DynamoDbTypeInformedElementConverter elementConverter = new DynamoDbTypeInformedElementConverter(TypeInformation.of(Order.class));
DdbSink.setElementConverter(elementConverter); 
}

{code}

We will start by supporting all Pojo/ basic/ Tuple/ Array typeInfo which should be enough to cover all DDB supported types (s,n,bool,b,ss,ns,bs,bools,m,l)

1- https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/model/AttributeValue.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)