You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Megha Gandhi <g1...@gmail.com> on 2021/08/12 18:32:47 UTC

Re: Flink not processing the records


> On Aug 12, 2021, at 11:31 AM, Megha Gandhi <g1...@gmail.com> wrote:
> 
> Hi 
> This is my flink app code 
> DataStream<String> input = createSourceFromStaticConfig(env);
> // Map it to a json
> ObjectMapper jsonParser = new ObjectMapper();
> DataStream<Tuple3<String, Double, Long>> inputStream = input.map(value -> { // Parse the JSON
>             JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
>             return new Tuple3<>(jsonNode.get("TICKER").asText(), jsonNode.get("PRICE").asDouble(), jsonNode.get("EVENT_TIME").asLong());
>         }).returns(Types.TUPLE(Types.STRING, Types.DOUBLE, Types.LONG));
> 
> SingleOutputStreamOperator<String> fiveMinSum = inputStream.assignTimestampsAndWatermarks(new TimestampExtractor())
>         .keyBy(t -> t.f0) // Logically partition the stream per stock symbol
>         .window(SlidingEventTimeWindows.of(Time.minutes(3), Time.minutes(1))) // Sliding window definition
>         .process(new MyProcessFunction());
> fiveMinSum.print();
> fiveMinSum.addSink(createS3SinkFromStaticConfig());
> env.execute("Flink Streaming Java API Skeleton");
> 
> 
> On execution in flink dashboard, I see data coming in but records are not being sent. Even though I added print as a sink, I do not see any messages in stdout in flink task manager console. How can I debug this?
> There are no exceptions as well in the dashboard.
> 
> Any inputs on how to solve this?


Re: Flink not processing the records

Posted by JING ZHANG <be...@gmail.com>.
Hi Megha,
Event window would be triggered after the watermark passed the end of
window. Would you please check the watermark value on the Flink UI.

Best,
JING ZHANG

Megha Gandhi <g1...@gmail.com> 于2021年8月13日周五 上午2:33写道:

>
>
> On Aug 12, 2021, at 11:31 AM, Megha Gandhi <g1...@gmail.com> wrote:
>
> Hi
> This is my flink app code
>
> DataStream<String> input = createSourceFromStaticConfig(env);
> // Map it to a json
> ObjectMapper jsonParser = new ObjectMapper();
> DataStream<Tuple3<String, Double, Long>> inputStream = input.map(value -> { // Parse the JSON
>             JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
>             return new Tuple3<>(jsonNode.get("TICKER").asText(), jsonNode.get("PRICE").asDouble(), jsonNode.get("EVENT_TIME").asLong());
>         }).returns(Types.TUPLE(Types.STRING, Types.DOUBLE, Types.LONG));
>
> SingleOutputStreamOperator<String> fiveMinSum = inputStream.assignTimestampsAndWatermarks(new TimestampExtractor())
>         .keyBy(t -> t.f0) // Logically partition the stream per stock symbol
>         .window(SlidingEventTimeWindows.of(Time.minutes(3), Time.minutes(1))) // Sliding window definition
>         .process(new MyProcessFunction());
> fiveMinSum.print();
> fiveMinSum.addSink(createS3SinkFromStaticConfig());
> env.execute("Flink Streaming Java API Skeleton");
>
>
>
> On execution in flink dashboard, I see data coming in but records are not
> being sent. Even though I added print as a sink, I do not see any messages
> in stdout in flink task manager console. How can I debug this?
> There are no exceptions as well in the dashboard.
>
> Any inputs on how to solve this?
>
>
>