You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Megha Gandhi <g1...@gmail.com> on 2021/08/12 18:32:47 UTC
Re: Flink not processing the records
> On Aug 12, 2021, at 11:31 AM, Megha Gandhi <g1...@gmail.com> wrote:
>
> Hi
> This is my flink app code
> DataStream<String> input = createSourceFromStaticConfig(env);
> // Map it to a json
> ObjectMapper jsonParser = new ObjectMapper();
> DataStream<Tuple3<String, Double, Long>> inputStream = input.map(value -> { // Parse the JSON
> JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
> return new Tuple3<>(jsonNode.get("TICKER").asText(), jsonNode.get("PRICE").asDouble(), jsonNode.get("EVENT_TIME").asLong());
> }).returns(Types.TUPLE(Types.STRING, Types.DOUBLE, Types.LONG));
>
> SingleOutputStreamOperator<String> fiveMinSum = inputStream.assignTimestampsAndWatermarks(new TimestampExtractor())
> .keyBy(t -> t.f0) // Logically partition the stream per stock symbol
> .window(SlidingEventTimeWindows.of(Time.minutes(3), Time.minutes(1))) // Sliding window definition
> .process(new MyProcessFunction());
> fiveMinSum.print();
> fiveMinSum.addSink(createS3SinkFromStaticConfig());
> env.execute("Flink Streaming Java API Skeleton");
>
>
> On execution in flink dashboard, I see data coming in but records are not being sent. Even though I added print as a sink, I do not see any messages in stdout in flink task manager console. How can I debug this?
> There are no exceptions as well in the dashboard.
>
> Any inputs on how to solve this?
Re: Flink not processing the records
Posted by JING ZHANG <be...@gmail.com>.
Hi Megha,
Event window would be triggered after the watermark passed the end of
window. Would you please check the watermark value on the Flink UI.
Best,
JING ZHANG
Megha Gandhi <g1...@gmail.com> 于2021年8月13日周五 上午2:33写道:
>
>
> On Aug 12, 2021, at 11:31 AM, Megha Gandhi <g1...@gmail.com> wrote:
>
> Hi
> This is my flink app code
>
> DataStream<String> input = createSourceFromStaticConfig(env);
> // Map it to a json
> ObjectMapper jsonParser = new ObjectMapper();
> DataStream<Tuple3<String, Double, Long>> inputStream = input.map(value -> { // Parse the JSON
> JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
> return new Tuple3<>(jsonNode.get("TICKER").asText(), jsonNode.get("PRICE").asDouble(), jsonNode.get("EVENT_TIME").asLong());
> }).returns(Types.TUPLE(Types.STRING, Types.DOUBLE, Types.LONG));
>
> SingleOutputStreamOperator<String> fiveMinSum = inputStream.assignTimestampsAndWatermarks(new TimestampExtractor())
> .keyBy(t -> t.f0) // Logically partition the stream per stock symbol
> .window(SlidingEventTimeWindows.of(Time.minutes(3), Time.minutes(1))) // Sliding window definition
> .process(new MyProcessFunction());
> fiveMinSum.print();
> fiveMinSum.addSink(createS3SinkFromStaticConfig());
> env.execute("Flink Streaming Java API Skeleton");
>
>
>
> On execution in flink dashboard, I see data coming in but records are not
> being sent. Even though I added print as a sink, I do not see any messages
> in stdout in flink task manager console. How can I debug this?
> There are no exceptions as well in the dashboard.
>
> Any inputs on how to solve this?
>
>
>