You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jaswin Shah <ja...@outlook.com> on 2020/05/22 06:57:17 UTC

Flink TTL for MapStates and Sideoutputs implementations

public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartMessage> cartState = null;

    /**
     * Map state for pg messages, orderId+mid is key and pgMessage is value.
     */
    private static MapState<String, PaymentNotifyRequestWrapper> pgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartMessage.class)
        );
        cartState = getRuntimeContext().getMapState(cartStateDescriptor);

        MapStateDescriptor<String, PaymentNotifyRequestWrapper> pgStateDescriptor = new MapStateDescriptor<>(
            Constants.PG_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(PaymentNotifyRequestWrapper.class)
        );
        pgState = getRuntimeContext().getMapState(pgStateDescriptor);
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        String searchKey = cartMessage.createJoinStringCondition();
        PaymentNotifyRequestWrapper paymentNotifyObject = pgState.get(searchKey);
        if(Objects.nonNull(paymentNotifyObject)) {
            generateResultMessage(cartMessage,paymentNotifyObject,collector);
            pgState.remove(searchKey);
        } else {
            cartState.put(searchKey,cartMessage);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        String searchKey = pgMessage.createJoinStringCondition();
        CartMessage cartMessage = cartState.get(searchKey);
        if(Objects.nonNull(cartMessage)) {
            generateResultMessage(cartMessage,pgMessage,collector);
            cartState.remove(searchKey);
        } else {
            pgState.put(searchKey,pgMessage);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        resultMessage.setCartOrderAmount(String.valueOf(cartMessage.getGrandtotal().longValue()));

        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}

Hi,
I have implemented the flink job with MapStates. The functionality is like,

  1.  I have two datastreams which I connect with connect operator and then call coprocessfunction with every pair of objects.
  2.  For element of first datastream, processElement1 method is called and for an element of second datastream, processElement2 method is called.
  3.  I have two MapStates in CoProcessFunction for both streams separately.
  4.  When processElement1 is called, it checks in MapState2 if corresponding element with given id is present, if present, I match, and delete. If not present, I add the object in MapState1.
  5.  When processElement2 is called, it checks in MapState1 if corresponding element with given id is present, if present, I match and delete. I fnot present I add object in MapState2.
  6.  Now, I want all the state data to be stored in Rocksdb.
  7.  After few days, I want to run a batch streaming job on Rocksdb to check if there are any objects which have not match found to create a report of those.

I need a help on how to configure TTL for messages and collect them to ontimer method on missing element timeout expiry and how to collect this data in sideoutputs and run a batch process over side output. Few code examples would be appreciated.

Thanks,
Jaswin

Re: Flink TTL for MapStates and Sideoutputs implementations

Posted by Jaswin Shah <ja...@outlook.com>.
Thanks for responding Alexander.
We have solved the problem now with ValueState now. Basically, here we are implementing outer join logic with custom keyedCoprocessFunction implementations.

________________________________
From: Alexander Fedulov <al...@ververica.com>
Sent: 28 May 2020 17:24
To: Jaswin Shah <ja...@outlook.com>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: Flink TTL for MapStates and Sideoutputs implementations

Hi Jaswin,

I would like to clarify something first - what do you key your streams by, when joining them?
It seems that what you want to do is to match each CartMessage with a corresponding Payment that has the same orderId+mid. If this is the case, you probably do not need the MapState in the first place.

Best,

--

Alexander Fedulov | Solutions Architect


[https://lh6.googleusercontent.com/BAYfe7E1EKlpcT1zGwlMWJEsZuwEv9KelOYQzIst9quO5oFdNebAja2EAsrJFipxig9u9ErB_5Tg2SQGSdLJo8lD3udSPG-uKope43NFO8lRMix-oMJSqwJLz9gOK8YtADdFSvR7]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time


On Fri, May 22, 2020 at 8:57 AM Jaswin Shah <ja...@outlook.com>> wrote:

public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartMessage> cartState = null;

    /**
     * Map state for pg messages, orderId+mid is key and pgMessage is value.
     */
    private static MapState<String, PaymentNotifyRequestWrapper> pgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartMessage.class)
        );
        cartState = getRuntimeContext().getMapState(cartStateDescriptor);

        MapStateDescriptor<String, PaymentNotifyRequestWrapper> pgStateDescriptor = new MapStateDescriptor<>(
            Constants.PG_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(PaymentNotifyRequestWrapper.class)
        );
        pgState = getRuntimeContext().getMapState(pgStateDescriptor);
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        String searchKey = cartMessage.createJoinStringCondition();
        PaymentNotifyRequestWrapper paymentNotifyObject = pgState.get(searchKey);
        if(Objects.nonNull(paymentNotifyObject)) {
            generateResultMessage(cartMessage,paymentNotifyObject,collector);
            pgState.remove(searchKey);
        } else {
            cartState.put(searchKey,cartMessage);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        String searchKey = pgMessage.createJoinStringCondition();
        CartMessage cartMessage = cartState.get(searchKey);
        if(Objects.nonNull(cartMessage)) {
            generateResultMessage(cartMessage,pgMessage,collector);
            cartState.remove(searchKey);
        } else {
            pgState.put(searchKey,pgMessage);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        resultMessage.setCartOrderAmount(String.valueOf(cartMessage.getGrandtotal().longValue()));

        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}

Hi,
I have implemented the flink job with MapStates. The functionality is like,

  1.  I have two datastreams which I connect with connect operator and then call coprocessfunction with every pair of objects.
  2.  For element of first datastream, processElement1 method is called and for an element of second datastream, processElement2 method is called.
  3.  I have two MapStates in CoProcessFunction for both streams separately.
  4.  When processElement1 is called, it checks in MapState2 if corresponding element with given id is present, if present, I match, and delete. If not present, I add the object in MapState1.
  5.  When processElement2 is called, it checks in MapState1 if corresponding element with given id is present, if present, I match and delete. I fnot present I add object in MapState2.
  6.  Now, I want all the state data to be stored in Rocksdb.
  7.  After few days, I want to run a batch streaming job on Rocksdb to check if there are any objects which have not match found to create a report of those.

I need a help on how to configure TTL for messages and collect them to ontimer method on missing element timeout expiry and how to collect this data in sideoutputs and run a batch process over side output. Few code examples would be appreciated.

Thanks,
Jaswin

Re: Flink TTL for MapStates and Sideoutputs implementations

Posted by Alexander Fedulov <al...@ververica.com>.
Hi Jaswin,

I would like to clarify something first - what do you key your streams by,
when joining them?
It seems that what you want to do is to match each CartMessage with a
corresponding Payment that has the same orderId+mid. If this is the case,
you probably do not need the MapState in the first place.

Best,

--

Alexander Fedulov | Solutions Architect

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time


On Fri, May 22, 2020 at 8:57 AM Jaswin Shah <ja...@outlook.com> wrote:

> public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {
>
>     private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);
>
>     /**
>      * Map state for cart messages, orderId+mid is key and cartMessage is value.
>      */
>     private static MapState<String, CartMessage> cartState = null;
>
>     /**
>      * Map state for pg messages, orderId+mid is key and pgMessage is value.
>      */
>     private static MapState<String, PaymentNotifyRequestWrapper> pgState = null;
>
>     /**
>      * Intializations for cart and pg mapStates
>      *
>      * @param config
>      */
>     @Override
>     public void open(Configuration config) {
>         MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
>             Constants.CART_DATA,
>             TypeInformation.of(String.class),
>             TypeInformation.of(CartMessage.class)
>         );
>         cartState = getRuntimeContext().getMapState(cartStateDescriptor);
>
>         MapStateDescriptor<String, PaymentNotifyRequestWrapper> pgStateDescriptor = new MapStateDescriptor<>(
>             Constants.PG_DATA,
>             TypeInformation.of(String.class),
>             TypeInformation.of(PaymentNotifyRequestWrapper.class)
>         );
>         pgState = getRuntimeContext().getMapState(pgStateDescriptor);
>     }
>
>     /**
>      * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
>      * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
>      * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
>      * @param cartMessage
>      * @param context
>      * @param collector
>      * @throws Exception
>      */
>     @Override
>     public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
>         String searchKey = cartMessage.createJoinStringCondition();
>         PaymentNotifyRequestWrapper paymentNotifyObject = pgState.get(searchKey);
>         if(Objects.nonNull(paymentNotifyObject)) {
>             generateResultMessage(cartMessage,paymentNotifyObject,collector);
>             pgState.remove(searchKey);
>         } else {
>             cartState.put(searchKey,cartMessage);
>         }
>     }
>
>     /**
>      * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
>      * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
>      * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
>      * @param pgMessage
>      * @param context
>      * @param collector
>      * @throws Exception
>      */
>     @Override
>     public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
>         String searchKey = pgMessage.createJoinStringCondition();
>         CartMessage cartMessage = cartState.get(searchKey);
>         if(Objects.nonNull(cartMessage)) {
>             generateResultMessage(cartMessage,pgMessage,collector);
>             cartState.remove(searchKey);
>         } else {
>             pgState.put(searchKey,pgMessage);
>         }
>     }
>
>     /**
>      * Create ResultMessage from cart and pg messages.
>      *
>      * @param cartMessage
>      * @param pgMessage
>      * @return
>      */
>     private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
>         ResultMessage resultMessage = new ResultMessage();
>         Payment payment = null;
>
>         //Logic should be in cart: check
>         for (Payment pay : cartMessage.getPayments()) {
>             if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
>                 payment = pay;
>                 break;
>             }
>         }
>         if(Objects.isNull(payment)) {
>             return;
>         }
>
>         resultMessage.setOrderId(cartMessage.getId());
>         resultMessage.setMid(payment.getMid());
>
>         resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
>         resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());
>
>         resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
>         resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());
>
>         resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
>         resultMessage.setCartOrderAmount(String.valueOf(cartMessage.getGrandtotal().longValue()));
>
>         resultMessage.setCartPaymethod(payment.getPayment_method());
>         resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());
>
>         checkDescripancyAndCollectResult(resultMessage,collector);
>     }
>
>     /**
>      * Evaluate if there is descripancy of any fields between the messages from two different systems.
>      * Write all the descripancy logic here.
>      *
>      * @param resultMessage
>      */
>     private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {
>
>         if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
>             resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
>             collector.collect(resultMessage.clone());
>         }
>
>         if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) {
>             resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
>             collector.collect(resultMessage.clone());
>         }
>
>         if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
>             resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
>             collector.collect(resultMessage.clone());
>         }
>     }
> }
>
> Hi,
> I have implemented the flink job with MapStates. The functionality is
> like,
>
>    1. I have two datastreams which I connect with connect operator and
>    then call coprocessfunction with every pair of objects.
>    2. For element of first datastream, processElement1 method is called
>    and for an element of second datastream, processElement2 method is called.
>    3. I have two MapStates in CoProcessFunction for both streams
>    separately.
>    4. When processElement1 is called, it checks in MapState2 if
>    corresponding element with given id is present, if present, I match, and
>    delete. If not present, I add the object in MapState1.
>    5. When processElement2 is called, it checks in MapState1 if
>    corresponding element with given id is present, if present, I match and
>    delete. I fnot present I add object in MapState2.
>    6. Now, I want all the state data to be stored in Rocksdb.
>    7. After few days, I want to run a batch streaming job on Rocksdb to
>    check if there are any objects which have not match found to create a
>    report of those.
>
>
> I need a help on how to configure TTL for messages and collect them to
> ontimer method on missing element timeout expiry and how to collect this
> data in sideoutputs and run a batch process over side output. Few code
> examples would be appreciated.
>
> Thanks,
> Jaswin
>