You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by David Anderson <da...@alpinegizmo.com> on 2020/07/17 09:52:41 UTC
Re: How to write junit testcases for KeyedBroadCastProcess Function
You could approach testing this in the same way that Flink has implemented
its unit tests for KeyedBroadcastProcessFunctions, which is to use
a KeyedTwoInputStreamOperatorTestHarness with
a CoBroadcastWithKeyedOperator. To learn how to use Flink's test harnesses,
see [1], and for an example of testing a KeyedBroadcastProcessFunction, see
[2].
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators
[2]
https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
Best,
David
On Wed, Jul 15, 2020 at 8:32 PM bujjirahul45 <ra...@gmail.com> wrote:
> Hi,
>
> I am new to flink i am trying write junit test cases to test
> KeyedBroadCastProcessFunction. Below is my code ,i am currently calling the
> getDataStreamOutput method in TestUtils class and passing inputdata and
> patternrules to method once the input data is evaluated against list of
> pattern rules and if input data satisfy the condition i will get the signal
> and calling sink function and returning output data as string in
> getDataStreamOutput method
>
> @Test
> public void testCompareInputAndOutputDataForInputSignal() throws
> Exception {
> Assertions.assertEquals(sampleInputSignal,
> TestUtils.getDataStreamOutput(
> inputSignal,
> patternRules));
> }
>
>
>
> public static String getDataStreamOutput(JSONObject input, Map<String,
> String> patternRules) throws Exception {
>
> env.setParallelism(1);
>
> DataStream<JSONObject> inputSignal = env.fromElements(input);
>
> DataStream<Map<String, String>> rawPatternStream =
> env.fromElements(patternRules);
>
> //Generate a key,value pair of set of patterns where key is
> pattern name and value is pattern condition
> DataStream<Tuple2<String, Map<String, String>>>
> patternRuleStream =
> rawPatternStream.flatMap(new
> FlatMapFunction<Map<String, String>,
> Tuple2<String, Map<String, String>>>() {
> @Override
> public void flatMap(Map<String, String>
> patternRules,
> Collector<Tuple2<String,
> Map<String, String>>> out) throws Exception {
> for (Map.Entry<String, String> stringEntry :
> patternRules.entrySet()) {
> JSONObject jsonObject = new
> JSONObject(stringEntry.getValue());
> Map<String, String> map = new HashMap<>();
> for (String key : jsonObject.keySet()) {
> String value =
> jsonObject.get(key).toString();
> map.put(key, value);
> }
> out.collect(new
> Tuple2<>(stringEntry.getKey(), map));
> }
> }
> });
>
> BroadcastStream<Tuple2<String, Map<String, String>>>
> patternRuleBroadcast =
> patternStream.broadcast(patternRuleDescriptor);
>
>
> DataStream<Tuple2<String, JSONObject>> validSignal =
> inputSignal.map(new MapFunction<JSONObject,
> Tuple2<String, JSONObject>>() {
> @Override
> public Tuple2<String, JSONObject> map(JSONObject
> inputSignal) throws Exception {
> String source =
> inputSignal.getSource();
> return new Tuple2<>(source, inputSignal);
> }
> }).keyBy(0).connect(patternRuleBroadcast).process(new
> MyKeyedBroadCastProcessFunction());
>
>
> validSignal.map(new MapFunction<Tuple2<String, JSONObject>,
> JSONObject>() {
> @Override
> public JSONObject map(Tuple2<String, JSONObject>
> inputSignal) throws Exception {
> return inputSignal.f1;
> }
> }).addSink(new getDataStreamOutput());
>
> env.execute("TestFlink");
> }
> return (getDataStreamOutput.dataStreamOutput);
> }
>
>
> @SuppressWarnings("serial")
> public static final class getDataStreamOutput implements
> SinkFunction<JSONObject> {
> public static String dataStreamOutput;
>
> public void invoke(JSONObject inputSignal) throws Exception {
> dataStreamOutput = inputSignal.toString();
> }
> }
> I need to test different inputs with same broadcast rules but each time i
> am calling this function its again and again doing process from beginning
> take input signal broadcast data, is there a way i can broadcast once and
> keeping on sending the input to the method i explored i can use
> CoFlatMapFunction something like below to combine datastream and keep on
> sending the input rules while method is running but for this one of the
> datastream has to keep on getting data from kafka topic again it will
> overburden on method to load kafka utils and server
>
> DataStream<JSONObject> inputSignalFromKafka =
> env.addSource(inputSignalKafka);
>
> DataStream<org.json.JSONObject> inputSignalFromMethod =
> env.fromElements(inputSignal));
>
> DataStream<JSONObject> inputSignal =
> inputSignalFromMethod.connect(inputSignalFromKafka)
> .flatMap(new SignalCoFlatMapper());
>
>
> public static class SignalCoFlatMapper
> implements CoFlatMapFunction<JSONObject, JSONObject,
> JSONObject> {
>
> @Override
> public void flatMap1(JSONObject inputValue, Collector<JSONObject>
> out) throws Exception {
> out.collect(inputValue);
>
> }
>
> @Override
> public void flatMap2(JSONObject kafkaValue, Collector<JSONObject>
> out) throws Exception {
> out.collect(kafkaValue);
>
> }
> }
> I found a link in stackoverflow How to unit test BroadcastProcessFunction
> in flink when processElement depends on broadcasted data but this is
> confused me a lot
>
> Any way i can only broadcast only once in Before method in test cases and
> keeping sending different kind of data to my broadcast function
>
>
> Thanks,
> Rahul.
>