You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by bujjirahul45 <ra...@gmail.com> on 2020/07/15 18:32:19 UTC

How to write junit testcases for KeyedBroadCastProcess Function

Hi,

I am new to flink i am trying write junit test cases to test
KeyedBroadCastProcessFunction. Below is my code ,i am currently calling the
getDataStreamOutput method in TestUtils class and passing inputdata and
patternrules to method once the input data is evaluated against list of
pattern rules and if input data satisfy the condition i will get the signal
and calling sink function and returning output data as string in
getDataStreamOutput method

 @Test
    public void testCompareInputAndOutputDataForInputSignal() throws
Exception {
        Assertions.assertEquals(sampleInputSignal,
                TestUtils.getDataStreamOutput(
                        inputSignal,
                        patternRules));
    }



public static String getDataStreamOutput(JSONObject input, Map<String,
String> patternRules) throws Exception {

            env.setParallelism(1);

            DataStream<JSONObject> inputSignal = env.fromElements(input);

            DataStream<Map<String, String>> rawPatternStream =
                    env.fromElements(patternRules);

            //Generate a key,value pair of set of patterns where key is
pattern name and value is pattern condition
            DataStream<Tuple2<String, Map<String, String>>>
patternRuleStream =
                    rawPatternStream.flatMap(new
FlatMapFunction<Map<String, String>,
                            Tuple2<String, Map<String, String>>>() {
                        @Override
                        public void flatMap(Map<String, String>
patternRules,
                                            Collector<Tuple2<String,
Map<String, String>>> out) throws Exception {
                            for (Map.Entry<String, String> stringEntry :
patternRules.entrySet()) {
                                JSONObject jsonObject = new
JSONObject(stringEntry.getValue());
                                Map<String, String> map = new HashMap<>();
                                for (String key : jsonObject.keySet()) {
                                    String value =
jsonObject.get(key).toString();
                                    map.put(key, value);
                                }
                                out.collect(new
Tuple2<>(stringEntry.getKey(), map));
                            }
                        }
                    });

            BroadcastStream<Tuple2<String, Map<String, String>>>
patternRuleBroadcast =
                    patternStream.broadcast(patternRuleDescriptor);


            DataStream<Tuple2<String, JSONObject>> validSignal =
inputSignal.map(new MapFunction<JSONObject,
                    Tuple2<String, JSONObject>>() {
                @Override
                public Tuple2<String, JSONObject> map(JSONObject
inputSignal) throws Exception {
                    String source =
                            inputSignal.getSource();
                    return new Tuple2<>(source, inputSignal);
                }
            }).keyBy(0).connect(patternRuleBroadcast).process(new
MyKeyedBroadCastProcessFunction());


             validSignal.map(new MapFunction<Tuple2<String, JSONObject>,
                    JSONObject>() {
                @Override
                public JSONObject map(Tuple2<String, JSONObject>
inputSignal) throws Exception {
                    return inputSignal.f1;
                }
            }).addSink(new getDataStreamOutput());

            env.execute("TestFlink");
        }
        return (getDataStreamOutput.dataStreamOutput);
    }


    @SuppressWarnings("serial")
    public static final class getDataStreamOutput implements
SinkFunction<JSONObject> {
        public static String dataStreamOutput;

        public void invoke(JSONObject inputSignal) throws Exception {
            dataStreamOutput = inputSignal.toString();
        }
    }
I need to test different inputs with same broadcast rules but each time i
am calling this function its again and again doing process from beginning
take input signal broadcast data, is there a way i can broadcast once and
keeping on sending the input to the method i explored i can use
CoFlatMapFunction something like below to combine datastream and keep on
sending the input rules while method is running but for this one of the
datastream has to keep on getting data from kafka topic again it will
overburden on method to load kafka utils and server

 DataStream<JSONObject> inputSignalFromKafka =
env.addSource(inputSignalKafka);

    DataStream<org.json.JSONObject> inputSignalFromMethod =
env.fromElements(inputSignal));

    DataStream<JSONObject> inputSignal =
inputSignalFromMethod.connect(inputSignalFromKafka)
                .flatMap(new SignalCoFlatMapper());


   public static class SignalCoFlatMapper
            implements CoFlatMapFunction<JSONObject, JSONObject,
JSONObject> {

        @Override
        public void flatMap1(JSONObject inputValue, Collector<JSONObject>
out) throws Exception {
            out.collect(inputValue);

        }

        @Override
        public void flatMap2(JSONObject kafkaValue, Collector<JSONObject>
out) throws Exception {
            out.collect(kafkaValue);

        }
    }
I found a link in stackoverflow How to unit test BroadcastProcessFunction
in flink when processElement depends on broadcasted data but this is
confused me a lot

Any way i can only broadcast only once in Before method in test cases and
keeping sending different kind of data to my broadcast function


Thanks,
Rahul.

Re: How to write junit testcases for KeyedBroadCastProcess Function

Posted by David Anderson <da...@alpinegizmo.com>.
You could approach testing this in the same way that Flink has implemented
its unit tests for KeyedBroadcastProcessFunctions, which is to use
a KeyedTwoInputStreamOperatorTestHarness with
a CoBroadcastWithKeyedOperator. To learn how to use Flink's test harnesses,
see [1], and for an example of testing a KeyedBroadcastProcessFunction, see
[2].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators
[2]
https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java

Best,
David

On Wed, Jul 15, 2020 at 8:32 PM bujjirahul45 <ra...@gmail.com> wrote:

> Hi,
>
> I am new to flink i am trying write junit test cases to test
> KeyedBroadCastProcessFunction. Below is my code ,i am currently calling the
> getDataStreamOutput method in TestUtils class and passing inputdata and
> patternrules to method once the input data is evaluated against list of
> pattern rules and if input data satisfy the condition i will get the signal
> and calling sink function and returning output data as string in
> getDataStreamOutput method
>
>  @Test
>     public void testCompareInputAndOutputDataForInputSignal() throws
> Exception {
>         Assertions.assertEquals(sampleInputSignal,
>                 TestUtils.getDataStreamOutput(
>                         inputSignal,
>                         patternRules));
>     }
>
>
>
> public static String getDataStreamOutput(JSONObject input, Map<String,
> String> patternRules) throws Exception {
>
>             env.setParallelism(1);
>
>             DataStream<JSONObject> inputSignal = env.fromElements(input);
>
>             DataStream<Map<String, String>> rawPatternStream =
>                     env.fromElements(patternRules);
>
>             //Generate a key,value pair of set of patterns where key is
> pattern name and value is pattern condition
>             DataStream<Tuple2<String, Map<String, String>>>
> patternRuleStream =
>                     rawPatternStream.flatMap(new
> FlatMapFunction<Map<String, String>,
>                             Tuple2<String, Map<String, String>>>() {
>                         @Override
>                         public void flatMap(Map<String, String>
> patternRules,
>                                             Collector<Tuple2<String,
> Map<String, String>>> out) throws Exception {
>                             for (Map.Entry<String, String> stringEntry :
> patternRules.entrySet()) {
>                                 JSONObject jsonObject = new
> JSONObject(stringEntry.getValue());
>                                 Map<String, String> map = new HashMap<>();
>                                 for (String key : jsonObject.keySet()) {
>                                     String value =
> jsonObject.get(key).toString();
>                                     map.put(key, value);
>                                 }
>                                 out.collect(new
> Tuple2<>(stringEntry.getKey(), map));
>                             }
>                         }
>                     });
>
>             BroadcastStream<Tuple2<String, Map<String, String>>>
> patternRuleBroadcast =
>                     patternStream.broadcast(patternRuleDescriptor);
>
>
>             DataStream<Tuple2<String, JSONObject>> validSignal =
> inputSignal.map(new MapFunction<JSONObject,
>                     Tuple2<String, JSONObject>>() {
>                 @Override
>                 public Tuple2<String, JSONObject> map(JSONObject
> inputSignal) throws Exception {
>                     String source =
>                             inputSignal.getSource();
>                     return new Tuple2<>(source, inputSignal);
>                 }
>             }).keyBy(0).connect(patternRuleBroadcast).process(new
> MyKeyedBroadCastProcessFunction());
>
>
>              validSignal.map(new MapFunction<Tuple2<String, JSONObject>,
>                     JSONObject>() {
>                 @Override
>                 public JSONObject map(Tuple2<String, JSONObject>
> inputSignal) throws Exception {
>                     return inputSignal.f1;
>                 }
>             }).addSink(new getDataStreamOutput());
>
>             env.execute("TestFlink");
>         }
>         return (getDataStreamOutput.dataStreamOutput);
>     }
>
>
>     @SuppressWarnings("serial")
>     public static final class getDataStreamOutput implements
> SinkFunction<JSONObject> {
>         public static String dataStreamOutput;
>
>         public void invoke(JSONObject inputSignal) throws Exception {
>             dataStreamOutput = inputSignal.toString();
>         }
>     }
> I need to test different inputs with same broadcast rules but each time i
> am calling this function its again and again doing process from beginning
> take input signal broadcast data, is there a way i can broadcast once and
> keeping on sending the input to the method i explored i can use
> CoFlatMapFunction something like below to combine datastream and keep on
> sending the input rules while method is running but for this one of the
> datastream has to keep on getting data from kafka topic again it will
> overburden on method to load kafka utils and server
>
>  DataStream<JSONObject> inputSignalFromKafka =
> env.addSource(inputSignalKafka);
>
>     DataStream<org.json.JSONObject> inputSignalFromMethod =
> env.fromElements(inputSignal));
>
>     DataStream<JSONObject> inputSignal =
> inputSignalFromMethod.connect(inputSignalFromKafka)
>                 .flatMap(new SignalCoFlatMapper());
>
>
>    public static class SignalCoFlatMapper
>             implements CoFlatMapFunction<JSONObject, JSONObject,
> JSONObject> {
>
>         @Override
>         public void flatMap1(JSONObject inputValue, Collector<JSONObject>
> out) throws Exception {
>             out.collect(inputValue);
>
>         }
>
>         @Override
>         public void flatMap2(JSONObject kafkaValue, Collector<JSONObject>
> out) throws Exception {
>             out.collect(kafkaValue);
>
>         }
>     }
> I found a link in stackoverflow How to unit test BroadcastProcessFunction
> in flink when processElement depends on broadcasted data but this is
> confused me a lot
>
> Any way i can only broadcast only once in Before method in test cases and
> keeping sending different kind of data to my broadcast function
>
>
> Thanks,
> Rahul.
>

Re: How to write junit testcases for KeyedBroadCastProcess Function

Posted by David Anderson <da...@alpinegizmo.com>.
You could approach testing this in the same way that Flink has implemented
its unit tests for KeyedBroadcastProcessFunctions, which is to use
a KeyedTwoInputStreamOperatorTestHarness with
a CoBroadcastWithKeyedOperator. To learn how to use Flink's test harnesses,
see [1], and for an example of testing a KeyedBroadcastProcessFunction, see
[2].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators
[2]
https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java

Best,
David

On Wed, Jul 15, 2020 at 8:32 PM bujjirahul45 <ra...@gmail.com> wrote:

> Hi,
>
> I am new to flink i am trying write junit test cases to test
> KeyedBroadCastProcessFunction. Below is my code ,i am currently calling the
> getDataStreamOutput method in TestUtils class and passing inputdata and
> patternrules to method once the input data is evaluated against list of
> pattern rules and if input data satisfy the condition i will get the signal
> and calling sink function and returning output data as string in
> getDataStreamOutput method
>
>  @Test
>     public void testCompareInputAndOutputDataForInputSignal() throws
> Exception {
>         Assertions.assertEquals(sampleInputSignal,
>                 TestUtils.getDataStreamOutput(
>                         inputSignal,
>                         patternRules));
>     }
>
>
>
> public static String getDataStreamOutput(JSONObject input, Map<String,
> String> patternRules) throws Exception {
>
>             env.setParallelism(1);
>
>             DataStream<JSONObject> inputSignal = env.fromElements(input);
>
>             DataStream<Map<String, String>> rawPatternStream =
>                     env.fromElements(patternRules);
>
>             //Generate a key,value pair of set of patterns where key is
> pattern name and value is pattern condition
>             DataStream<Tuple2<String, Map<String, String>>>
> patternRuleStream =
>                     rawPatternStream.flatMap(new
> FlatMapFunction<Map<String, String>,
>                             Tuple2<String, Map<String, String>>>() {
>                         @Override
>                         public void flatMap(Map<String, String>
> patternRules,
>                                             Collector<Tuple2<String,
> Map<String, String>>> out) throws Exception {
>                             for (Map.Entry<String, String> stringEntry :
> patternRules.entrySet()) {
>                                 JSONObject jsonObject = new
> JSONObject(stringEntry.getValue());
>                                 Map<String, String> map = new HashMap<>();
>                                 for (String key : jsonObject.keySet()) {
>                                     String value =
> jsonObject.get(key).toString();
>                                     map.put(key, value);
>                                 }
>                                 out.collect(new
> Tuple2<>(stringEntry.getKey(), map));
>                             }
>                         }
>                     });
>
>             BroadcastStream<Tuple2<String, Map<String, String>>>
> patternRuleBroadcast =
>                     patternStream.broadcast(patternRuleDescriptor);
>
>
>             DataStream<Tuple2<String, JSONObject>> validSignal =
> inputSignal.map(new MapFunction<JSONObject,
>                     Tuple2<String, JSONObject>>() {
>                 @Override
>                 public Tuple2<String, JSONObject> map(JSONObject
> inputSignal) throws Exception {
>                     String source =
>                             inputSignal.getSource();
>                     return new Tuple2<>(source, inputSignal);
>                 }
>             }).keyBy(0).connect(patternRuleBroadcast).process(new
> MyKeyedBroadCastProcessFunction());
>
>
>              validSignal.map(new MapFunction<Tuple2<String, JSONObject>,
>                     JSONObject>() {
>                 @Override
>                 public JSONObject map(Tuple2<String, JSONObject>
> inputSignal) throws Exception {
>                     return inputSignal.f1;
>                 }
>             }).addSink(new getDataStreamOutput());
>
>             env.execute("TestFlink");
>         }
>         return (getDataStreamOutput.dataStreamOutput);
>     }
>
>
>     @SuppressWarnings("serial")
>     public static final class getDataStreamOutput implements
> SinkFunction<JSONObject> {
>         public static String dataStreamOutput;
>
>         public void invoke(JSONObject inputSignal) throws Exception {
>             dataStreamOutput = inputSignal.toString();
>         }
>     }
> I need to test different inputs with same broadcast rules but each time i
> am calling this function its again and again doing process from beginning
> take input signal broadcast data, is there a way i can broadcast once and
> keeping on sending the input to the method i explored i can use
> CoFlatMapFunction something like below to combine datastream and keep on
> sending the input rules while method is running but for this one of the
> datastream has to keep on getting data from kafka topic again it will
> overburden on method to load kafka utils and server
>
>  DataStream<JSONObject> inputSignalFromKafka =
> env.addSource(inputSignalKafka);
>
>     DataStream<org.json.JSONObject> inputSignalFromMethod =
> env.fromElements(inputSignal));
>
>     DataStream<JSONObject> inputSignal =
> inputSignalFromMethod.connect(inputSignalFromKafka)
>                 .flatMap(new SignalCoFlatMapper());
>
>
>    public static class SignalCoFlatMapper
>             implements CoFlatMapFunction<JSONObject, JSONObject,
> JSONObject> {
>
>         @Override
>         public void flatMap1(JSONObject inputValue, Collector<JSONObject>
> out) throws Exception {
>             out.collect(inputValue);
>
>         }
>
>         @Override
>         public void flatMap2(JSONObject kafkaValue, Collector<JSONObject>
> out) throws Exception {
>             out.collect(kafkaValue);
>
>         }
>     }
> I found a link in stackoverflow How to unit test BroadcastProcessFunction
> in flink when processElement depends on broadcasted data but this is
> confused me a lot
>
> Any way i can only broadcast only once in Before method in test cases and
> keeping sending different kind of data to my broadcast function
>
>
> Thanks,
> Rahul.
>