You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by David Anderson <da...@alpinegizmo.com> on 2020/07/17 09:52:41 UTC

Re: How to write junit testcases for KeyedBroadCastProcess Function

You could approach testing this in the same way that Flink has implemented
its unit tests for KeyedBroadcastProcessFunctions, which is to use
a KeyedTwoInputStreamOperatorTestHarness with
a CoBroadcastWithKeyedOperator. To learn how to use Flink's test harnesses,
see [1], and for an example of testing a KeyedBroadcastProcessFunction, see
[2].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators
[2]
https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java

Best,
David

On Wed, Jul 15, 2020 at 8:32 PM bujjirahul45 <ra...@gmail.com> wrote:

> Hi,
>
> I am new to flink i am trying write junit test cases to test
> KeyedBroadCastProcessFunction. Below is my code ,i am currently calling the
> getDataStreamOutput method in TestUtils class and passing inputdata and
> patternrules to method once the input data is evaluated against list of
> pattern rules and if input data satisfy the condition i will get the signal
> and calling sink function and returning output data as string in
> getDataStreamOutput method
>
>  @Test
>     public void testCompareInputAndOutputDataForInputSignal() throws
> Exception {
>         Assertions.assertEquals(sampleInputSignal,
>                 TestUtils.getDataStreamOutput(
>                         inputSignal,
>                         patternRules));
>     }
>
>
>
> public static String getDataStreamOutput(JSONObject input, Map<String,
> String> patternRules) throws Exception {
>
>             env.setParallelism(1);
>
>             DataStream<JSONObject> inputSignal = env.fromElements(input);
>
>             DataStream<Map<String, String>> rawPatternStream =
>                     env.fromElements(patternRules);
>
>             //Generate a key,value pair of set of patterns where key is
> pattern name and value is pattern condition
>             DataStream<Tuple2<String, Map<String, String>>>
> patternRuleStream =
>                     rawPatternStream.flatMap(new
> FlatMapFunction<Map<String, String>,
>                             Tuple2<String, Map<String, String>>>() {
>                         @Override
>                         public void flatMap(Map<String, String>
> patternRules,
>                                             Collector<Tuple2<String,
> Map<String, String>>> out) throws Exception {
>                             for (Map.Entry<String, String> stringEntry :
> patternRules.entrySet()) {
>                                 JSONObject jsonObject = new
> JSONObject(stringEntry.getValue());
>                                 Map<String, String> map = new HashMap<>();
>                                 for (String key : jsonObject.keySet()) {
>                                     String value =
> jsonObject.get(key).toString();
>                                     map.put(key, value);
>                                 }
>                                 out.collect(new
> Tuple2<>(stringEntry.getKey(), map));
>                             }
>                         }
>                     });
>
>             BroadcastStream<Tuple2<String, Map<String, String>>>
> patternRuleBroadcast =
>                     patternStream.broadcast(patternRuleDescriptor);
>
>
>             DataStream<Tuple2<String, JSONObject>> validSignal =
> inputSignal.map(new MapFunction<JSONObject,
>                     Tuple2<String, JSONObject>>() {
>                 @Override
>                 public Tuple2<String, JSONObject> map(JSONObject
> inputSignal) throws Exception {
>                     String source =
>                             inputSignal.getSource();
>                     return new Tuple2<>(source, inputSignal);
>                 }
>             }).keyBy(0).connect(patternRuleBroadcast).process(new
> MyKeyedBroadCastProcessFunction());
>
>
>              validSignal.map(new MapFunction<Tuple2<String, JSONObject>,
>                     JSONObject>() {
>                 @Override
>                 public JSONObject map(Tuple2<String, JSONObject>
> inputSignal) throws Exception {
>                     return inputSignal.f1;
>                 }
>             }).addSink(new getDataStreamOutput());
>
>             env.execute("TestFlink");
>         }
>         return (getDataStreamOutput.dataStreamOutput);
>     }
>
>
>     @SuppressWarnings("serial")
>     public static final class getDataStreamOutput implements
> SinkFunction<JSONObject> {
>         public static String dataStreamOutput;
>
>         public void invoke(JSONObject inputSignal) throws Exception {
>             dataStreamOutput = inputSignal.toString();
>         }
>     }
> I need to test different inputs with same broadcast rules but each time i
> am calling this function its again and again doing process from beginning
> take input signal broadcast data, is there a way i can broadcast once and
> keeping on sending the input to the method i explored i can use
> CoFlatMapFunction something like below to combine datastream and keep on
> sending the input rules while method is running but for this one of the
> datastream has to keep on getting data from kafka topic again it will
> overburden on method to load kafka utils and server
>
>  DataStream<JSONObject> inputSignalFromKafka =
> env.addSource(inputSignalKafka);
>
>     DataStream<org.json.JSONObject> inputSignalFromMethod =
> env.fromElements(inputSignal));
>
>     DataStream<JSONObject> inputSignal =
> inputSignalFromMethod.connect(inputSignalFromKafka)
>                 .flatMap(new SignalCoFlatMapper());
>
>
>    public static class SignalCoFlatMapper
>             implements CoFlatMapFunction<JSONObject, JSONObject,
> JSONObject> {
>
>         @Override
>         public void flatMap1(JSONObject inputValue, Collector<JSONObject>
> out) throws Exception {
>             out.collect(inputValue);
>
>         }
>
>         @Override
>         public void flatMap2(JSONObject kafkaValue, Collector<JSONObject>
> out) throws Exception {
>             out.collect(kafkaValue);
>
>         }
>     }
> I found a link in stackoverflow How to unit test BroadcastProcessFunction
> in flink when processElement depends on broadcasted data but this is
> confused me a lot
>
> Any way i can only broadcast only once in Before method in test cases and
> keeping sending different kind of data to my broadcast function
>
>
> Thanks,
> Rahul.
>