You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jaswin Shah <ja...@outlook.com> on 2020/05/22 15:00:20 UTC

onTimer method in CoProcessFunction in flink

How can I identify the type of element for which onTime is called in flink?
I want to store the objects for which onTimer is called to sideOutputs and then streamout the sideoutput data to kafka topic. I am not understanding how to stream out the sideoutput data like where should I write that processing logic. Below is the code snippet I have done so far


/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartMessage> cartState = null;

    /**
     * Map state for pg messages, orderId+mid is key and pgMessage is value.
     */
    private static MapState<String, PaymentNotifyRequestWrapper> pgState = null;


    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartMessage.class)
        );
        cartState = getRuntimeContext().getMapState(cartStateDescriptor);

        MapStateDescriptor<String, PaymentNotifyRequestWrapper> pgStateDescriptor = new MapStateDescriptor<>(
            Constants.PG_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(PaymentNotifyRequestWrapper.class)
        );
        pgState = getRuntimeContext().getMapState(pgStateDescriptor);
    }

    /**
     *
     * @return
     */

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {

    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        context.timerService().registerEventTimeTimer(context.timestamp()+3600000);

        String searchKey = cartMessage.createJoinStringCondition();
        PaymentNotifyRequestWrapper paymentNotifyObject = pgState.get(searchKey);
        if(Objects.nonNull(paymentNotifyObject)) {
            generateResultMessage(cartMessage,paymentNotifyObject,collector);
            pgState.remove(searchKey);
        } else {
            cartState.put(searchKey,cartMessage);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        context.timerService().registerEventTimeTimer(context.timestamp()+3600000);
        String searchKey = pgMessage.createJoinStringCondition();
        CartMessage cartMessage = cartState.get(searchKey);
        if(Objects.nonNull(cartMessage)) {
            generateResultMessage(cartMessage,pgMessage,collector);
            cartState.remove(searchKey);
        } else {
            pgState.put(searchKey,pgMessage);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(Double.valueOf(pgMessage.getOrderAmount().getValue()));
        resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

        logger.info("cart amount {} pg amount {} ",resultMessage.getCartOrderAmount(),resultMessage.getPgOrderAmount());
        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}


Help will be highly appreciated.

回复:Re: onTimer method in CoProcessFunction in flink

Posted by Yun Gao <yu...@aliyun.com>.
Hi Jaswin,
    I think the event time timer and process time timer in Flink should be fully decoupled: the event time timer is trigger by the watermark received, and the processing time is trigger by physical clock, and you may think them as two seperated timelines and have no guarantee on their relative speed. Therefore, I think the result of computing the deadline with event time and register it as processing time should be nondetermined, and it depends on the gap between event time and processing time.

Best,
 Yun



 ------------------原始邮件 ------------------
发件人:Jaswin Shah <ja...@outlook.com>
发送时间:Sat May 23 22:08:57 2020
收件人:user@flink.apache.org <us...@flink.apache.org>, Arvid Heise <ar...@ververica.com>, Yun Tang <my...@live.com>
主题:Re: onTimer method in CoProcessFunction in flink

Hi Yun,
Actually this problem is solved now. I have been stuck in other problem of timeoutcallbacks. Here, I am receiving the callbacks too early and the eventime registrations was somehow failing, might be it was needing some special handling. I need to know if this callback registration is wrong or is there something wrong.
Do we need some special handling for event time semantecs usages?
Thanks,
Jaswin

From: Jaswin Shah <ja...@outlook.com>
Sent: 22 May 2020 20:30
To: user@flink.apache.org <us...@flink.apache.org>; Arvid Heise <ar...@ververica.com>; Yun Tang <my...@live.com>
Subject: onTimer method in CoProcessFunction in flink
How can I identify the type of element for which onTime is called in flink?
I want to store the objects for which onTimer is called to sideOutputs and then streamout the sideoutput data to kafka topic. I am not understanding how to stream out the sideoutput data like where should I write that processing logic. Below is the code snippet I have done so far


/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartMessage> cartState = null;

    /**
     * Map state for pg messages, orderId+mid is key and pgMessage is value.
     */
    private static MapState<String, PaymentNotifyRequestWrapper> pgState = null;


    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartMessage.class)
        );
        cartState = getRuntimeContext().getMapState(cartStateDescriptor);

        MapStateDescriptor<String, PaymentNotifyRequestWrapper> pgStateDescriptor = new MapStateDescriptor<>(
            Constants.PG_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(PaymentNotifyRequestWrapper.class)
        );
        pgState = getRuntimeContext().getMapState(pgStateDescriptor);
    }

    /**
     *
     * @return
     */

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
        
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        context.timerService().registerEventTimeTimer(context.timestamp()+3600000);

        String searchKey = cartMessage.createJoinStringCondition();
        PaymentNotifyRequestWrapper paymentNotifyObject = pgState.get(searchKey);
        if(Objects.nonNull(paymentNotifyObject)) {
            generateResultMessage(cartMessage,paymentNotifyObject,collector);
            pgState.remove(searchKey);
        } else {
            cartState.put(searchKey,cartMessage);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        context.timerService().registerEventTimeTimer(context.timestamp()+3600000);
        String searchKey = pgMessage.createJoinStringCondition();
        CartMessage cartMessage = cartState.get(searchKey);
        if(Objects.nonNull(cartMessage)) {
            generateResultMessage(cartMessage,pgMessage,collector);
            cartState.remove(searchKey);
        } else {
            pgState.put(searchKey,pgMessage);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(Double.valueOf(pgMessage.getOrderAmount().getValue()));
        resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

        logger.info("cart amount {} pg amount {} ",resultMessage.getCartOrderAmount(),resultMessage.getPgOrderAmount());
        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}


Help will be highly appreciated.

Re: onTimer method in CoProcessFunction in flink

Posted by Jaswin Shah <ja...@outlook.com>.
Hi Yun,
Actually this problem is solved now. I have been stuck in other problem of timeoutcallbacks. Here, I am receiving the callbacks too early and the eventime registrations was somehow failing, might be it was needing some special handling. I need to know if this callback registration is wrong or is there something wrong.
Do we need some special handling for event time semantecs usages?
[cid:3a50dc66-61bf-4676-83f3-07e87460631f]Thanks,
Jaswin
________________________________
From: Jaswin Shah <ja...@outlook.com>
Sent: 22 May 2020 20:30
To: user@flink.apache.org <us...@flink.apache.org>; Arvid Heise <ar...@ververica.com>; Yun Tang <my...@live.com>
Subject: onTimer method in CoProcessFunction in flink

How can I identify the type of element for which onTime is called in flink?
I want to store the objects for which onTimer is called to sideOutputs and then streamout the sideoutput data to kafka topic. I am not understanding how to stream out the sideoutput data like where should I write that processing logic. Below is the code snippet I have done so far


/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartMessage> cartState = null;

    /**
     * Map state for pg messages, orderId+mid is key and pgMessage is value.
     */
    private static MapState<String, PaymentNotifyRequestWrapper> pgState = null;


    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartMessage.class)
        );
        cartState = getRuntimeContext().getMapState(cartStateDescriptor);

        MapStateDescriptor<String, PaymentNotifyRequestWrapper> pgStateDescriptor = new MapStateDescriptor<>(
            Constants.PG_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(PaymentNotifyRequestWrapper.class)
        );
        pgState = getRuntimeContext().getMapState(pgStateDescriptor);
    }

    /**
     *
     * @return
     */

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {

    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        context.timerService().registerEventTimeTimer(context.timestamp()+3600000);

        String searchKey = cartMessage.createJoinStringCondition();
        PaymentNotifyRequestWrapper paymentNotifyObject = pgState.get(searchKey);
        if(Objects.nonNull(paymentNotifyObject)) {
            generateResultMessage(cartMessage,paymentNotifyObject,collector);
            pgState.remove(searchKey);
        } else {
            cartState.put(searchKey,cartMessage);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        context.timerService().registerEventTimeTimer(context.timestamp()+3600000);
        String searchKey = pgMessage.createJoinStringCondition();
        CartMessage cartMessage = cartState.get(searchKey);
        if(Objects.nonNull(cartMessage)) {
            generateResultMessage(cartMessage,pgMessage,collector);
            cartState.remove(searchKey);
        } else {
            pgState.put(searchKey,pgMessage);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(Double.valueOf(pgMessage.getOrderAmount().getValue()));
        resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

        logger.info("cart amount {} pg amount {} ",resultMessage.getCartOrderAmount(),resultMessage.getPgOrderAmount());
        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}


Help will be highly appreciated.

回复:onTimer method in CoProcessFunction in flink

Posted by Yun Gao <yu...@aliyun.com>.
Hi Jaswin,

     If I understand right, I think you could add the logic in the onTimer callback. In this callback, OnTimerContext.output(xx, outputTag) could be used to output data to the specific sideout. Besides, you should need a new state to store the elements to output in the onTimer callback. A similar example might be [1].

Best,
 Yun
    ​ 

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example




 ------------------原始邮件 ------------------
发件人:Jaswin Shah <ja...@outlook.com>
发送时间:Fri May 22 23:00:43 2020
收件人:user@flink.apache.org <us...@flink.apache.org>, Arvid Heise <ar...@ververica.com>, Yun Tang <my...@live.com>
主题:onTimer method in CoProcessFunction in flink

How can I identify the type of element for which onTime is called in flink?
I want to store the objects for which onTimer is called to sideOutputs and then streamout the sideoutput data to kafka topic. I am not understanding how to stream out the sideoutput data like where should I write that processing logic. Below is the code snippet I have done so far


/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartMessage> cartState = null;

    /**
     * Map state for pg messages, orderId+mid is key and pgMessage is value.
     */
    private static MapState<String, PaymentNotifyRequestWrapper> pgState = null;


    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartMessage.class)
        );
        cartState = getRuntimeContext().getMapState(cartStateDescriptor);

        MapStateDescriptor<String, PaymentNotifyRequestWrapper> pgStateDescriptor = new MapStateDescriptor<>(
            Constants.PG_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(PaymentNotifyRequestWrapper.class)
        );
        pgState = getRuntimeContext().getMapState(pgStateDescriptor);
    }

    /**
     *
     * @return
     */

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
        
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        context.timerService().registerEventTimeTimer(context.timestamp()+3600000);

        String searchKey = cartMessage.createJoinStringCondition();
        PaymentNotifyRequestWrapper paymentNotifyObject = pgState.get(searchKey);
        if(Objects.nonNull(paymentNotifyObject)) {
            generateResultMessage(cartMessage,paymentNotifyObject,collector);
            pgState.remove(searchKey);
        } else {
            cartState.put(searchKey,cartMessage);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        context.timerService().registerEventTimeTimer(context.timestamp()+3600000);
        String searchKey = pgMessage.createJoinStringCondition();
        CartMessage cartMessage = cartState.get(searchKey);
        if(Objects.nonNull(cartMessage)) {
            generateResultMessage(cartMessage,pgMessage,collector);
            cartState.remove(searchKey);
        } else {
            pgState.put(searchKey,pgMessage);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(Double.valueOf(pgMessage.getOrderAmount().getValue()));
        resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

        logger.info("cart amount {} pg amount {} ",resultMessage.getCartOrderAmount(),resultMessage.getPgOrderAmount());
        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}


Help will be highly appreciated.