You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Stephen Connolly <st...@gmail.com> on 2019/02/19 11:14:41 UTC

How to debug difference between Kinesis and Kafka

Hi, I’m having a strange situation and I would like to know where I should
start trying to debug.

I have set up a configurable swap in source, with three implementations:

1. A mock implementation
2. A Kafka consumer implementation
3. A Kinesis consumer implementation

From injecting a log and no-op map function I can see that all three
sources pass through the events correctly.

I then have a window based on event time stamps… and from inspecting the
aggregation function I can see that the data is getting aggregated…, I’m
using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
can retrieve the key

Here’s the strange thing, I only change the source (and each source uses
the same deserialization function) but:


   - When I use either Kafka or my Mock source, the WindowFunction gets
   called as events pass the end of the window
   - When I use the Kinesis source, however, the window function never gets
   called. I have even tried injecting events into kinesis with really high
   timestamps to flush the watermarks in my
   BoundedOutOfOrdernessTimestampExtractor... but nothing

I cannot see how this source switching could result in such a different
behaviour:

        Properties sourceProperties = new Properties();
        ConsumerFactory sourceFactory;
        String sourceName = configParams.getRequired("source");
        switch (sourceName.toLowerCase(Locale.ENGLISH)) {
            case "kinesis":
                sourceFactory = FlinkKinesisConsumer::new;
                copyOptionalArg(configParams, "aws-region",
sourceProperties, AWSConfigConstants.AWS_REGION);
                copyOptionalArg(configParams, "aws-endpoint",
sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
                copyOptionalArg(configParams, "aws-access-key",
sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
                copyOptionalArg(configParams, "aws-secret-key",
sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
                copyOptionalArg(configParams, "aws-profile",
sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
                break;
            case "kafka":
                sourceFactory = FlinkKafkaConsumer010::new;
                copyRequiredArg(configParams, "bootstrap-server",
sourceProperties, "bootstrap.servers");
                copyOptionalArg(configParams, "group-id", sourceProperties,
"group.id");
                break;
            case "mock":
                sourceFactory = MockSourceFunction::new;
                break;
            default:
                throw new RuntimeException("Unknown source '" + sourceName
+ '\'');
        }

        // set up the streaming execution environment
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        // poll watermark every second because using
BoundedOutOfOrdernessTimestampExtractor
        env.getConfig().setAutoWatermarkInterval(1000L);
        env.enableCheckpointing(5000);

        SplitStream<JsonNode> eventsByType =
env.addSource(sourceFactory.create(
                configParams.getRequired("topic"),
                new ObjectNodeDeserializationSchema(),
                sourceProperties
        ))
                .returns(ObjectNode.class) // the use of ConsumerFactory
erases the type info so add it back
                .name("raw-events")
                .assignTimestampsAndWatermarks(
                        new
ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
Time.seconds(5))
                )
                .split(new JsonNodeOutputSelector("eventType"));
...
        eventsByType.select(...)
                .keyBy(new JsonNodeStringKeySelector("_key"))

.window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
                        (KeySelector<JsonNode, Time>)
TasksMain::offsetPerMaster))
                .trigger(EventTimeTrigger.create())
                .aggregate(new CountsAggregator<>(), new KeyTagger<>()) //
<==== The CountsAggregator is seeing the data
                .print() // <==== HERE is where we get no output from
Kinesis... but Kafka and my Mock are just fine!

Re: EXT :Re: How to debug difference between Kinesis and Kafka

Posted by Dian Fu <di...@gmail.com>.
DataStream.assignTimestampsAndWatermarks will add a watermark generator operator after each source operator(if their parallelism is the same which is true for the code you showed) and so if one instance of the source operator has no data, the corresponding watermark generator operator cannot generate watermark.

Regards,
Dian


> 在 2019年2月20日,上午12:56,Stephen Connolly <st...@gmail.com> 写道:
> 
> Though I am explicitly assigning watermarks with DataStream.assignTimestampsAndWatermarks and I see all the data flowing through that... so shouldn't that override the watermarks from the original source?
> 
> On Tue, 19 Feb 2019 at 15:59, Martin, Nick <Nick.Martin@ngc.com <ma...@ngc.com>> wrote:
> Yeah, that’s expected/known. Watermarks for the empty partition don’t advance, so the window in your window function never closes.
> 
>  
> 
> There’s a ticket open to fix it (https://issues.apache.org/jira/browse/FLINK-5479 <https://issues.apache.org/jira/browse/FLINK-5479>) for the kafka connector, but in general any time one parallel instance of a source function isn’t getting data you have to watch out for this.
> 
>  
> 
> From: Stephen Connolly [mailto:stephen.alan.connolly@gmail.com <ma...@gmail.com>] 
> Sent: Tuesday, February 19, 2019 6:32 AM
> To: user <user@flink.apache.org <ma...@flink.apache.org>>
> Subject: EXT :Re: How to debug difference between Kinesis and Kafka
> 
>  
> 
> Hmmm my suspicions are now quite high. I created a file source that just replays the events straight then I get more results....
> 
>  
> 
> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <stephen.alan.connolly@gmail.com <ma...@gmail.com>> wrote:
> 
> Hmmm after expanding the dataset such that there was additional data that ended up on shard-0 (everything in my original dataset was coincidentally landing on shard-1) I am now getting output... should I expect this kind of behaviour if no data arrives at shard-0 ever?
> 
>  
> 
> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <stephen.alan.connolly@gmail.com <ma...@gmail.com>> wrote:
> 
> Hi, I’m having a strange situation and I would like to know where I should start trying to debug.
> 
>  
> 
> I have set up a configurable swap in source, with three implementations:
> 
>  
> 
> 1. A mock implementation
> 
> 2. A Kafka consumer implementation
> 
> 3. A Kinesis consumer implementation
> 
>  
> 
> From injecting a log and no-op map function I can see that all three sources pass through the events correctly.
> 
>  
> 
> I then have a window based on event time stamps… and from inspecting the aggregation function I can see that the data is getting aggregated…, I’m using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I can retrieve the key
> 
>  
> 
> Here’s the strange thing, I only change the source (and each source uses the same deserialization function) but:
> 
>  
> 
> When I use either Kafka or my Mock source, the WindowFunction gets called as events pass the end of the window
> When I use the Kinesis source, however, the window function never gets called. I have even tried injecting events into kinesis with really high timestamps to flush the watermarks in my BoundedOutOfOrdernessTimestampExtractor... but nothing
> I cannot see how this source switching could result in such a different behaviour:
> 
>  
> 
>         Properties sourceProperties = new Properties();
> 
>         ConsumerFactory sourceFactory;
> 
>         String sourceName = configParams.getRequired("source");
> 
>         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
> 
>             case "kinesis":
> 
>                 sourceFactory = FlinkKinesisConsumer::new;
> 
>                 copyOptionalArg(configParams, "aws-region", sourceProperties, AWSConfigConstants.AWS_REGION);
> 
>                 copyOptionalArg(configParams, "aws-endpoint", sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
> 
>                 copyOptionalArg(configParams, "aws-access-key", sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
> 
>                 copyOptionalArg(configParams, "aws-secret-key", sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
> 
>                 copyOptionalArg(configParams, "aws-profile", sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
> 
>                 break;
> 
>             case "kafka":
> 
>                 sourceFactory = FlinkKafkaConsumer010::new;
> 
>                 copyRequiredArg(configParams, "bootstrap-server", sourceProperties, "bootstrap.servers");
> 
>                 copyOptionalArg(configParams, "group-id", sourceProperties, "group.id <http://group.id/>");
> 
>                 break;
> 
>             case "mock":
> 
>                 sourceFactory = MockSourceFunction::new;
> 
>                 break;
> 
>             default:
> 
>                 throw new RuntimeException("Unknown source '" + sourceName + '\'');
> 
>         }
> 
>  
> 
>         // set up the streaming execution environment
> 
>         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 
>  
> 
>         // poll watermark every second because using BoundedOutOfOrdernessTimestampExtractor
> 
>         env.getConfig().setAutoWatermarkInterval(1000L);
> 
>         env.enableCheckpointing(5000);
> 
>  
> 
>         SplitStream<JsonNode> eventsByType = env.addSource(sourceFactory.create(
> 
>                 configParams.getRequired("topic"),
> 
>                 new ObjectNodeDeserializationSchema(),
> 
>                 sourceProperties
> 
>         ))
> 
>                 .returns(ObjectNode.class) // the use of ConsumerFactory erases the type info so add it back
> 
>                 .name("raw-events")
> 
>                 .assignTimestampsAndWatermarks(
> 
>                         new ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp", Time.seconds(5))
> 
>                 )
> 
>                 .split(new JsonNodeOutputSelector("eventType"));
> 
> ...
> 
>         eventsByType.select(...)
> 
>                 .keyBy(new JsonNodeStringKeySelector("_key"))
> 
>                 .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
> 
>                         (KeySelector<JsonNode, Time>) TasksMain::offsetPerMaster))
> 
>                 .trigger(EventTimeTrigger.create())
> 
>                 .aggregate(new CountsAggregator<>(), new KeyTagger<>()) // <==== The CountsAggregator is seeing the data
> 
>                 .print() // <==== HERE is where we get no output from Kinesis... but Kafka and my Mock are just fine!
> 
>  
> 
>  
> 
>  
> 
> Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in error, please notify the sender by reply e-mail and destroy the e-mail message and any physical copies made of the communication.
>  Thank you. 
> *********************
> 
> 
> Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in error, please notify the sender by reply e-mail and destroy the e-mail message and any physical copies made of the communication.
>  Thank you. 
> *********************


Re: EXT :Re: How to debug difference between Kinesis and Kafka

Posted by Stephen Connolly <st...@gmail.com>.
Though I am explicitly assigning watermarks with
DataStream.assignTimestampsAndWatermarks and I see all the data flowing
through that... so shouldn't that override the watermarks from the original
source?

On Tue, 19 Feb 2019 at 15:59, Martin, Nick <Ni...@ngc.com> wrote:

> Yeah, that’s expected/known. Watermarks for the empty partition don’t
> advance, so the window in your window function never closes.
>
>
>
> There’s a ticket open to fix it (
> https://issues.apache.org/jira/browse/FLINK-5479) for the kafka
> connector, but in general any time one parallel instance of a source
> function isn’t getting data you have to watch out for this.
>
>
>
> *From:* Stephen Connolly [mailto:stephen.alan.connolly@gmail.com]
> *Sent:* Tuesday, February 19, 2019 6:32 AM
> *To:* user <us...@flink.apache.org>
> *Subject:* EXT :Re: How to debug difference between Kinesis and Kafka
>
>
>
> Hmmm my suspicions are now quite high. I created a file source that just
> replays the events straight then I get more results....
>
>
>
> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
> stephen.alan.connolly@gmail.com> wrote:
>
> Hmmm after expanding the dataset such that there was additional data that
> ended up on shard-0 (everything in my original dataset was coincidentally
> landing on shard-1) I am now getting output... should I expect this kind of
> behaviour if no data arrives at shard-0 ever?
>
>
>
> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
> stephen.alan.connolly@gmail.com> wrote:
>
> Hi, I’m having a strange situation and I would like to know where I should
> start trying to debug.
>
>
>
> I have set up a configurable swap in source, with three implementations:
>
>
>
> 1. A mock implementation
>
> 2. A Kafka consumer implementation
>
> 3. A Kinesis consumer implementation
>
>
>
> From injecting a log and no-op map function I can see that all three
> sources pass through the events correctly.
>
>
>
> I then have a window based on event time stamps… and from inspecting the
> aggregation function I can see that the data is getting aggregated…, I’m
> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
> can retrieve the key
>
>
>
> Here’s the strange thing, I only change the source (and each source uses
> the same deserialization function) but:
>
>
>
>    - When I use either Kafka or my Mock source, the WindowFunction gets
>    called as events pass the end of the window
>    - When I use the Kinesis source, however, the window function never
>    gets called. I have even tried injecting events into kinesis with really
>    high timestamps to flush the watermarks in my
>    BoundedOutOfOrdernessTimestampExtractor... but nothing
>
> I cannot see how this source switching could result in such a different
> behaviour:
>
>
>
>         Properties sourceProperties = new Properties();
>
>         ConsumerFactory sourceFactory;
>
>         String sourceName = configParams.getRequired("source");
>
>         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>
>             case "kinesis":
>
>                 sourceFactory = FlinkKinesisConsumer::new;
>
>                 copyOptionalArg(configParams, "aws-region",
> sourceProperties, AWSConfigConstants.AWS_REGION);
>
>                 copyOptionalArg(configParams, "aws-endpoint",
> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>
>                 copyOptionalArg(configParams, "aws-access-key",
> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>
>                 copyOptionalArg(configParams, "aws-secret-key",
> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>
>                 copyOptionalArg(configParams, "aws-profile",
> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>
>                 break;
>
>             case "kafka":
>
>                 sourceFactory = FlinkKafkaConsumer010::new;
>
>                 copyRequiredArg(configParams, "bootstrap-server",
> sourceProperties, "bootstrap.servers");
>
>                 copyOptionalArg(configParams, "group-id",
> sourceProperties, "group.id");
>
>                 break;
>
>             case "mock":
>
>                 sourceFactory = MockSourceFunction::new;
>
>                 break;
>
>             default:
>
>                 throw new RuntimeException("Unknown source '" + sourceName
> + '\'');
>
>         }
>
>
>
>         // set up the streaming execution environment
>
>         final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
>
>         // poll watermark every second because using
> BoundedOutOfOrdernessTimestampExtractor
>
>         env.getConfig().setAutoWatermarkInterval(1000L);
>
>         env.enableCheckpointing(5000);
>
>
>
>         SplitStream<JsonNode> eventsByType =
> env.addSource(sourceFactory.create(
>
>                 configParams.getRequired("topic"),
>
>                 new ObjectNodeDeserializationSchema(),
>
>                 sourceProperties
>
>         ))
>
>                 .returns(ObjectNode.class) // the use of ConsumerFactory
> erases the type info so add it back
>
>                 .name("raw-events")
>
>                 .assignTimestampsAndWatermarks(
>
>                         new
> ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
> Time.seconds(5))
>
>                 )
>
>                 .split(new JsonNodeOutputSelector("eventType"));
>
> ...
>
>         eventsByType.select(...)
>
>                 .keyBy(new JsonNodeStringKeySelector("_key"))
>
>
> .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
>
>                         (KeySelector<JsonNode, Time>)
> TasksMain::offsetPerMaster))
>
>                 .trigger(EventTimeTrigger.create())
>
>                 .aggregate(new CountsAggregator<>(), new KeyTagger<>()) //
> <==== The CountsAggregator is seeing the data
>
>                 .print() // <==== HERE is where we get no output from
> Kinesis... but Kafka and my Mock are just fine!
>
>
>
>
>
>
> ------------------------------
>
> Notice: This e-mail is intended solely for use of the individual or entity
> to which it is addressed and may contain information that is proprietary,
> privileged and/or exempt from disclosure under applicable law. If the
> reader is not the intended recipient or agent responsible for delivering
> the message to the intended recipient, you are hereby notified that any
> dissemination, distribution or copying of this communication is strictly
> prohibited. This communication may also contain data subject to U.S. export
> laws. If so, data subject to the International Traffic in Arms Regulation
> cannot be disseminated, distributed, transferred, or copied, whether
> incorporated or in its original form, to foreign nationals residing in the
> U.S. or abroad, absent the express prior approval of the U.S. Department of
> State. Data subject to the Export Administration Act may not be
> disseminated, distributed, transferred or copied contrary to U. S.
> Department of Commerce regulations. If you have received this communication
> in error, please notify the sender by reply e-mail and destroy the e-mail
> message and any physical copies made of the communication.
>  Thank you.
> *********************
>
> ------------------------------
> Notice: This e-mail is intended solely for use of the individual or entity
> to which it is addressed and may contain information that is proprietary,
> privileged and/or exempt from disclosure under applicable law. If the
> reader is not the intended recipient or agent responsible for delivering
> the message to the intended recipient, you are hereby notified that any
> dissemination, distribution or copying of this communication is strictly
> prohibited. This communication may also contain data subject to U.S. export
> laws. If so, data subject to the International Traffic in Arms Regulation
> cannot be disseminated, distributed, transferred, or copied, whether
> incorporated or in its original form, to foreign nationals residing in the
> U.S. or abroad, absent the express prior approval of the U.S. Department of
> State. Data subject to the Export Administration Act may not be
> disseminated, distributed, transferred or copied contrary to U. S.
> Department of Commerce regulations. If you have received this communication
> in error, please notify the sender by reply e-mail and destroy the e-mail
> message and any physical copies made of the communication.
>  Thank you.
> *********************
>

RE: EXT :Re: How to debug difference between Kinesis and Kafka

Posted by "Martin, Nick" <Ni...@ngc.com>.
Yeah, that’s expected/known. Watermarks for the empty partition don’t advance, so the window in your window function never closes.

There’s a ticket open to fix it (https://issues.apache.org/jira/browse/FLINK-5479) for the kafka connector, but in general any time one parallel instance of a source function isn’t getting data you have to watch out for this.

From: Stephen Connolly [mailto:stephen.alan.connolly@gmail.com]
Sent: Tuesday, February 19, 2019 6:32 AM
To: user <us...@flink.apache.org>
Subject: EXT :Re: How to debug difference between Kinesis and Kafka

Hmmm my suspicions are now quite high. I created a file source that just replays the events straight then I get more results....

On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <st...@gmail.com>> wrote:
Hmmm after expanding the dataset such that there was additional data that ended up on shard-0 (everything in my original dataset was coincidentally landing on shard-1) I am now getting output... should I expect this kind of behaviour if no data arrives at shard-0 ever?

On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <st...@gmail.com>> wrote:
Hi, I’m having a strange situation and I would like to know where I should start trying to debug.

I have set up a configurable swap in source, with three implementations:

1. A mock implementation
2. A Kafka consumer implementation
3. A Kinesis consumer implementation

From injecting a log and no-op map function I can see that all three sources pass through the events correctly.

I then have a window based on event time stamps… and from inspecting the aggregation function I can see that the data is getting aggregated…, I’m using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I can retrieve the key

Here’s the strange thing, I only change the source (and each source uses the same deserialization function) but:


  *   When I use either Kafka or my Mock source, the WindowFunction gets called as events pass the end of the window
  *   When I use the Kinesis source, however, the window function never gets called. I have even tried injecting events into kinesis with really high timestamps to flush the watermarks in my BoundedOutOfOrdernessTimestampExtractor... but nothing
I cannot see how this source switching could result in such a different behaviour:

        Properties sourceProperties = new Properties();
        ConsumerFactory sourceFactory;
        String sourceName = configParams.getRequired("source");
        switch (sourceName.toLowerCase(Locale.ENGLISH)) {
            case "kinesis":
                sourceFactory = FlinkKinesisConsumer::new;
                copyOptionalArg(configParams, "aws-region", sourceProperties, AWSConfigConstants.AWS_REGION);
                copyOptionalArg(configParams, "aws-endpoint", sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
                copyOptionalArg(configParams, "aws-access-key", sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
                copyOptionalArg(configParams, "aws-secret-key", sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
                copyOptionalArg(configParams, "aws-profile", sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
                break;
            case "kafka":
                sourceFactory = FlinkKafkaConsumer010::new;
                copyRequiredArg(configParams, "bootstrap-server", sourceProperties, "bootstrap.servers");
                copyOptionalArg(configParams, "group-id", sourceProperties, "group.id<http://group.id>");
                break;
            case "mock":
                sourceFactory = MockSourceFunction::new;
                break;
            default:
                throw new RuntimeException("Unknown source '" + sourceName + '\'');
        }

        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // poll watermark every second because using BoundedOutOfOrdernessTimestampExtractor
        env.getConfig().setAutoWatermarkInterval(1000L);
        env.enableCheckpointing(5000);

        SplitStream<JsonNode> eventsByType = env.addSource(sourceFactory.create(
                configParams.getRequired("topic"),
                new ObjectNodeDeserializationSchema(),
                sourceProperties
        ))
                .returns(ObjectNode.class) // the use of ConsumerFactory erases the type info so add it back
                .name("raw-events")
                .assignTimestampsAndWatermarks(
                        new ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp", Time.seconds(5))
                )
                .split(new JsonNodeOutputSelector("eventType"));
...
        eventsByType.select(...)
                .keyBy(new JsonNodeStringKeySelector("_key"))
                .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
                        (KeySelector<JsonNode, Time>) TasksMain::offsetPerMaster))
                .trigger(EventTimeTrigger.create())
                .aggregate(new CountsAggregator<>(), new KeyTagger<>()) // <==== The CountsAggregator is seeing the data
                .print() // <==== HERE is where we get no output from Kinesis... but Kafka and my Mock are just fine!



________________________________
Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in error, please notify the sender by reply e-mail and destroy the e-mail message and any physical copies made of the communication.
 Thank you.
*********************


------------------------------------------------------------------------------

Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in error, please notify the sender by reply e-mail and destroy the e-mail message and any physical copies made of the communication.
 Thank you. 
*********************

Re: How to debug difference between Kinesis and Kafka

Posted by Congxian Qiu <qc...@gmail.com>.
Hi Stephen

If the window has not been triggered ever, maybe you could investigate the watermark, maybe the doc[1][2] can be helpful.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#interaction-of-watermarks-and-windows
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks

Best, Congxian
On Feb 19, 2019, 21:31 +0800, Stephen Connolly <st...@gmail.com>, wrote:
> Hmmm my suspicions are now quite high. I created a file source that just replays the events straight then I get more results....
>
> > On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <st...@gmail.com> wrote:
> > > Hmmm after expanding the dataset such that there was additional data that ended up on shard-0 (everything in my original dataset was coincidentally landing on shard-1) I am now getting output... should I expect this kind of behaviour if no data arrives at shard-0 ever?
> > >
> > > > On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <st...@gmail.com> wrote:
> > > > > Hi, I’m having a strange situation and I would like to know where I should start trying to debug.
> > > > >
> > > > > I have set up a configurable swap in source, with three implementations:
> > > > >
> > > > > 1. A mock implementation
> > > > > 2. A Kafka consumer implementation
> > > > > 3. A Kinesis consumer implementation
> > > > >
> > > > > From injecting a log and no-op map function I can see that all three sources pass through the events correctly.
> > > > >
> > > > > I then have a window based on event time stamps… and from inspecting the aggregation function I can see that the data is getting aggregated…, I’m using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I can retrieve the key
> > > > >
> > > > > Here’s the strange thing, I only change the source (and each source uses the same deserialization function) but:
> > > > >
> > > > >
> > > > > • When I use either Kafka or my Mock source, the WindowFunction gets called as events pass the end of the window
> > > > > • When I use the Kinesis source, however, the window function never gets called. I have even tried injecting events into kinesis with really high timestamps to flush the watermarks in my BoundedOutOfOrdernessTimestampExtractor... but nothing
> > > > >
> > > > > I cannot see how this source switching could result in such a different behaviour:
> > > > >
> > > > >         Properties sourceProperties = new Properties();
> > > > >         ConsumerFactory sourceFactory;
> > > > >         String sourceName = configParams.getRequired("source");
> > > > >         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
> > > > >             case "kinesis":
> > > > >                 sourceFactory = FlinkKinesisConsumer::new;
> > > > >                 copyOptionalArg(configParams, "aws-region", sourceProperties, AWSConfigConstants.AWS_REGION);
> > > > >                 copyOptionalArg(configParams, "aws-endpoint", sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
> > > > >                 copyOptionalArg(configParams, "aws-access-key", sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
> > > > >                 copyOptionalArg(configParams, "aws-secret-key", sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
> > > > >                 copyOptionalArg(configParams, "aws-profile", sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
> > > > >                 break;
> > > > >             case "kafka":
> > > > >                 sourceFactory = FlinkKafkaConsumer010::new;
> > > > >                 copyRequiredArg(configParams, "bootstrap-server", sourceProperties, "bootstrap.servers");
> > > > >                 copyOptionalArg(configParams, "group-id", sourceProperties, "group.id");
> > > > >                 break;
> > > > >             case "mock":
> > > > >                 sourceFactory = MockSourceFunction::new;
> > > > >                 break;
> > > > >             default:
> > > > >                 throw new RuntimeException("Unknown source '" + sourceName + '\'');
> > > > >         }
> > > > >
> > > > >         // set up the streaming execution environment
> > > > >         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> > > > >
> > > > >         // poll watermark every second because using BoundedOutOfOrdernessTimestampExtractor
> > > > >         env.getConfig().setAutoWatermarkInterval(1000L);
> > > > >         env.enableCheckpointing(5000);
> > > > >
> > > > >         SplitStream<JsonNode> eventsByType = env.addSource(sourceFactory.create(
> > > > >                 configParams.getRequired("topic"),
> > > > >                 new ObjectNodeDeserializationSchema(),
> > > > >                 sourceProperties
> > > > >         ))
> > > > >                 .returns(ObjectNode.class) // the use of ConsumerFactory erases the type info so add it back
> > > > >                 .name("raw-events")
> > > > >                 .assignTimestampsAndWatermarks(
> > > > >                         new ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp", Time.seconds(5))
> > > > >                 )
> > > > >                 .split(new JsonNodeOutputSelector("eventType"));
> > > > > ...
> > > > >         eventsByType.select(...)
> > > > >                 .keyBy(new JsonNodeStringKeySelector("_key"))
> > > > >                 .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
> > > > >                         (KeySelector<JsonNode, Time>) TasksMain::offsetPerMaster))
> > > > >                 .trigger(EventTimeTrigger.create())
> > > > >                 .aggregate(new CountsAggregator<>(), new KeyTagger<>()) // <==== The CountsAggregator is seeing the data
> > > > >                 .print() // <==== HERE is where we get no output from Kinesis... but Kafka and my Mock are just fine!
> > > > >
> > > > >

Re: How to debug difference between Kinesis and Kafka

Posted by Stephen Connolly <st...@gmail.com>.
On Thu, 21 Feb 2019 at 14:00, Dawid Wysakowicz <dw...@apache.org>
wrote:

> If an event arrived at WindowOperator before the Watermark, then it will
> be accounted for window aggregation and put in state. Once that state gets
> checkpointed this same event won't be processed again. In other words if a
> checkpoint succeeds elements that produced corresponding state won't be
> processed again. You may want to read this docs for further
> understanding[1].
>
> What I meant by reprocessing is when you want to reprocess the same input
> records. E.g. you want to rerun your job once again on data from a past
> week. This computation might result in different results than the original
> ones cause Watermarks might get generated after different elements as they
> are bound by "ProcessingTime".
>
Ahh that clarifies. Nope we are processing the stream of events Taylor
Swift style... a.k.a. "We are never processing them again, like ever".

The stream of events is too much data to retain it all. Kinesis will just
keep 1 day's history for recovery.

I'd read [1] before, but then when you mentioned "you might get different
results in case of reprocessing" I started to think that maybe the
Watermarks are the Barrier but after your clarification I'm back to
thinking they are separate similar mechanisms operating in the stream


> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream_checkpointing.html#checkpointing
> On 21/02/2019 14:42, Stephen Connolly wrote:
>
>
>
> On Thu, 21 Feb 2019 at 13:36, Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> It is definitely a solution ;)
>>
>> You should be aware of the downsides though:
>>
>>    - you might get different results in case of reprocessing
>>    - you might drop some data as late, due to some delays in processing,
>>    if the events arrive later then the "ProcessingTime" threshold
>>
>> So I have a separate stream processor from the "late" side of my window
> that works out what the update is.
>
> But I guess the question I have is around what happens with reprocessing.
>
> 1. Event 1 goes into the window aggregation because it is before the
> watermark
>
> 2. State gets checkpointed
>
> 3. Crash
>
> 4. Recover
>
> Will Event 1 now go to the late stream or will it be tagged as having been
> included into the state in the checkpoint.
>
> I don't mind if Event 1 gets included in the window's "create event count
> for timebox" output or the "update event count for timebox from late
> events" output as long as it is always one and only one of those paths.
>
>
>>
>>
>> Best,
>>
>> Dawid
>> On 21/02/2019 14:18, Stephen Connolly wrote:
>>
>> Yes, it was the "watermarks for event time when no events for that shard"
>> problem.
>>
>> I am now investigating whether we can use a blended watermark of
>> max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to ensure
>> idle shards do not cause excessive data retention.
>>
>> Is that the best solution?
>>
>> On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz <dw...@apache.org>
>> wrote:
>>
>>> Hi Stephen,
>>>
>>> Watermark for a single operator is the minimum of Watermarks received
>>> from all inputs, therefore if one of your shards/operators does not have
>>> incoming data it will not produce Watermarks thus the Watermark of
>>> WindowOperator will not progress. So this is sort of an expected behavior.
>>>
>>> I recommend reading the docs linked by Congxian, especially this
>>> section[1].
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
>>> On 19/02/2019 14:31, Stephen Connolly wrote:
>>>
>>> Hmmm my suspicions are now quite high. I created a file source that just
>>> replays the events straight then I get more results....
>>>
>>> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
>>> stephen.alan.connolly@gmail.com> wrote:
>>>
>>>> Hmmm after expanding the dataset such that there was additional data
>>>> that ended up on shard-0 (everything in my original dataset was
>>>> coincidentally landing on shard-1) I am now getting output... should I
>>>> expect this kind of behaviour if no data arrives at shard-0 ever?
>>>>
>>>> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
>>>> stephen.alan.connolly@gmail.com> wrote:
>>>>
>>>>> Hi, I’m having a strange situation and I would like to know where I
>>>>> should start trying to debug.
>>>>>
>>>>> I have set up a configurable swap in source, with three
>>>>> implementations:
>>>>>
>>>>> 1. A mock implementation
>>>>> 2. A Kafka consumer implementation
>>>>> 3. A Kinesis consumer implementation
>>>>>
>>>>> From injecting a log and no-op map function I can see that all three
>>>>> sources pass through the events correctly.
>>>>>
>>>>> I then have a window based on event time stamps… and from inspecting
>>>>> the aggregation function I can see that the data is getting aggregated…,
>>>>> I’m using the `.aggregate(AggregateFunction.WindowFunction)` variant so
>>>>> that I can retrieve the key
>>>>>
>>>>> Here’s the strange thing, I only change the source (and each source
>>>>> uses the same deserialization function) but:
>>>>>
>>>>>
>>>>>    - When I use either Kafka or my Mock source, the WindowFunction
>>>>>    gets called as events pass the end of the window
>>>>>    - When I use the Kinesis source, however, the window function
>>>>>    never gets called. I have even tried injecting events into kinesis with
>>>>>    really high timestamps to flush the watermarks in my
>>>>>    BoundedOutOfOrdernessTimestampExtractor... but nothing
>>>>>
>>>>> I cannot see how this source switching could result in such a
>>>>> different behaviour:
>>>>>
>>>>>         Properties sourceProperties = new Properties();
>>>>>         ConsumerFactory sourceFactory;
>>>>>         String sourceName = configParams.getRequired("source");
>>>>>         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>>>>>             case "kinesis":
>>>>>                 sourceFactory = FlinkKinesisConsumer::new;
>>>>>                 copyOptionalArg(configParams, "aws-region",
>>>>> sourceProperties, AWSConfigConstants.AWS_REGION);
>>>>>                 copyOptionalArg(configParams, "aws-endpoint",
>>>>> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>>>>>                 copyOptionalArg(configParams, "aws-access-key",
>>>>> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>>>>>                 copyOptionalArg(configParams, "aws-secret-key",
>>>>> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>>>>>                 copyOptionalArg(configParams, "aws-profile",
>>>>> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>>>>>                 break;
>>>>>             case "kafka":
>>>>>                 sourceFactory = FlinkKafkaConsumer010::new;
>>>>>                 copyRequiredArg(configParams, "bootstrap-server",
>>>>> sourceProperties, "bootstrap.servers");
>>>>>                 copyOptionalArg(configParams, "group-id",
>>>>> sourceProperties, "group.id");
>>>>>                 break;
>>>>>             case "mock":
>>>>>                 sourceFactory = MockSourceFunction::new;
>>>>>                 break;
>>>>>             default:
>>>>>                 throw new RuntimeException("Unknown source '" +
>>>>> sourceName + '\'');
>>>>>         }
>>>>>
>>>>>         // set up the streaming execution environment
>>>>>         final StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>
>>>>>         // poll watermark every second because using
>>>>> BoundedOutOfOrdernessTimestampExtractor
>>>>>         env.getConfig().setAutoWatermarkInterval(1000L);
>>>>>         env.enableCheckpointing(5000);
>>>>>
>>>>>         SplitStream<JsonNode> eventsByType =
>>>>> env.addSource(sourceFactory.create(
>>>>>                 configParams.getRequired("topic"),
>>>>>                 new ObjectNodeDeserializationSchema(),
>>>>>                 sourceProperties
>>>>>         ))
>>>>>                 .returns(ObjectNode.class) // the use of
>>>>> ConsumerFactory erases the type info so add it back
>>>>>                 .name("raw-events")
>>>>>                 .assignTimestampsAndWatermarks(
>>>>>                         new
>>>>> ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
>>>>> Time.seconds(5))
>>>>>                 )
>>>>>                 .split(new JsonNodeOutputSelector("eventType"));
>>>>> ...
>>>>>         eventsByType.select(...)
>>>>>                 .keyBy(new JsonNodeStringKeySelector("_key"))
>>>>>
>>>>> .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
>>>>>                         (KeySelector<JsonNode, Time>)
>>>>> TasksMain::offsetPerMaster))
>>>>>                 .trigger(EventTimeTrigger.create())
>>>>>                 .aggregate(new CountsAggregator<>(), new
>>>>> KeyTagger<>()) // <==== The CountsAggregator is seeing the data
>>>>>                 .print() // <==== HERE is where we get no output from
>>>>> Kinesis... but Kafka and my Mock are just fine!
>>>>>
>>>>>
>>>>>

Re: How to debug difference between Kinesis and Kafka

Posted by Dawid Wysakowicz <dw...@apache.org>.
If an event arrived at WindowOperator before the Watermark, then it will
be accounted for window aggregation and put in state. Once that state
gets checkpointed this same event won't be processed again. In other
words if a checkpoint succeeds elements that produced corresponding
state won't be processed again. You may want to read this docs for
further understanding[1].

What I meant by reprocessing is when you want to reprocess the same
input records. E.g. you want to rerun your job once again on data from a
past week. This computation might result in different results than the
original ones cause Watermarks might get generated after different
elements as they are bound by "ProcessingTime".

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream_checkpointing.html#checkpointing

On 21/02/2019 14:42, Stephen Connolly wrote:
>
>
> On Thu, 21 Feb 2019 at 13:36, Dawid Wysakowicz <dwysakowicz@apache.org
> <ma...@apache.org>> wrote:
>
>     It is definitely a solution ;)
>
>     You should be aware of the downsides though:
>
>       * you might get different results in case of reprocessing
>       * you might drop some data as late, due to some delays in
>         processing, if the events arrive later then the
>         "ProcessingTime" threshold
>
> So I have a separate stream processor from the "late" side of my
> window that works out what the update is.
>
> But I guess the question I have is around what happens with reprocessing.
>
> 1. Event 1 goes into the window aggregation because it is before the
> watermark
>
> 2. State gets checkpointed
>
> 3. Crash
>
> 4. Recover
>
> Will Event 1 now go to the late stream or will it be tagged as having
> been included into the state in the checkpoint.
>
> I don't mind if Event 1 gets included in the window's "create event
> count for timebox" output or the "update event count for timebox from
> late events" output as long as it is always one and only one of those
> paths.
>  
>
>     Best,
>
>     Dawid
>
>     On 21/02/2019 14:18, Stephen Connolly wrote:
>>     Yes, it was the "watermarks for event time when no events for
>>     that shard" problem.
>>
>>     I am now investigating whether we can use a blended watermark of
>>     max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min)
>>     to ensure idle shards do not cause excessive data retention.
>>
>>     Is that the best solution?
>>
>>     On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz
>>     <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>>
>>         Hi Stephen,
>>
>>         Watermark for a single operator is the minimum of Watermarks
>>         received from all inputs, therefore if one of your
>>         shards/operators does not have incoming data it will not
>>         produce Watermarks thus the Watermark of WindowOperator will
>>         not progress. So this is sort of an expected behavior.
>>
>>         I recommend reading the docs linked by Congxian, especially
>>         this section[1].
>>
>>         Best,
>>
>>         Dawid
>>
>>         [1]
>>         https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
>>
>>         On 19/02/2019 14:31, Stephen Connolly wrote:
>>>         Hmmm my suspicions are now quite high. I created a file
>>>         source that just replays the events straight then I get more
>>>         results....
>>>
>>>         On Tue, 19 Feb 2019 at 11:50, Stephen Connolly
>>>         <stephen.alan.connolly@gmail.com
>>>         <ma...@gmail.com>> wrote:
>>>
>>>             Hmmm after expanding the dataset such that there was
>>>             additional data that ended up on shard-0 (everything in
>>>             my original dataset was coincidentally landing on
>>>             shard-1) I am now getting output... should I expect this
>>>             kind of behaviour if no data arrives at shard-0 ever?
>>>
>>>             On Tue, 19 Feb 2019 at 11:14, Stephen Connolly
>>>             <stephen.alan.connolly@gmail.com
>>>             <ma...@gmail.com>> wrote:
>>>
>>>                 Hi, I’m having a strange situation and I would like
>>>                 to know where I should start trying to debug.
>>>
>>>                 I have set up a configurable swap in source, with
>>>                 three implementations:
>>>
>>>                 1. A mock implementation
>>>                 2. A Kafka consumer implementation
>>>                 3. A Kinesis consumer implementation
>>>
>>>                 From injecting a log and no-op map function I can
>>>                 see that all three sources pass through the events
>>>                 correctly.
>>>
>>>                 I then have a window based on event time stamps… and
>>>                 from inspecting the aggregation function I can see
>>>                 that the data is getting aggregated…, I’m using the
>>>                 `.aggregate(AggregateFunction.WindowFunction)`
>>>                 variant so that I can retrieve the key
>>>
>>>                 Here’s the strange thing, I only change the source
>>>                 (and each source uses the same deserialization
>>>                 function) but:
>>>
>>>                   * When I use either Kafka or my Mock source, the
>>>                     WindowFunction gets called as events pass the
>>>                     end of the window
>>>                   * When I use the Kinesis source, however, the
>>>                     window function never gets called. I have even
>>>                     tried injecting events into kinesis with really
>>>                     high timestamps to flush the watermarks in my
>>>                     BoundedOutOfOrdernessTimestampExtractor... but
>>>                     nothing
>>>
>>>                 I cannot see how this source switching could result
>>>                 in such a different behaviour:
>>>
>>>                         Properties sourceProperties = new Properties();
>>>                         ConsumerFactory sourceFactory;
>>>                         String sourceName =
>>>                 configParams.getRequired("source");
>>>                         switch
>>>                 (sourceName.toLowerCase(Locale.ENGLISH)) {
>>>                             case "kinesis":
>>>                                 sourceFactory =
>>>                 FlinkKinesisConsumer::new;
>>>                                 copyOptionalArg(configParams,
>>>                 "aws-region", sourceProperties,
>>>                 AWSConfigConstants.AWS_REGION);
>>>                                 copyOptionalArg(configParams,
>>>                 "aws-endpoint", sourceProperties,
>>>                 AWSConfigConstants.AWS_ENDPOINT);
>>>                                 copyOptionalArg(configParams,
>>>                 "aws-access-key", sourceProperties,
>>>                 AWSConfigConstants.AWS_ACCESS_KEY_ID);
>>>                                 copyOptionalArg(configParams,
>>>                 "aws-secret-key", sourceProperties,
>>>                 AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>>>                                 copyOptionalArg(configParams,
>>>                 "aws-profile", sourceProperties,
>>>                 AWSConfigConstants.AWS_PROFILE_NAME);
>>>                                 break;
>>>                             case "kafka":
>>>                                 sourceFactory =
>>>                 FlinkKafkaConsumer010::new;
>>>                                 copyRequiredArg(configParams,
>>>                 "bootstrap-server", sourceProperties,
>>>                 "bootstrap.servers");
>>>                                 copyOptionalArg(configParams,
>>>                 "group-id", sourceProperties, "group.id
>>>                 <http://group.id>");
>>>                                 break;
>>>                             case "mock":
>>>                                 sourceFactory = MockSourceFunction::new;
>>>                                 break;
>>>                             default:
>>>                                 throw new RuntimeException("Unknown
>>>                 source '" + sourceName + '\'');
>>>                         }
>>>
>>>                         // set up the streaming execution environment
>>>                         final StreamExecutionEnvironment env =
>>>                 StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>>                         // poll watermark every second because using
>>>                 BoundedOutOfOrdernessTimestampExtractor
>>>                         env.getConfig().setAutoWatermarkInterval(1000L);
>>>                         env.enableCheckpointing(5000);
>>>
>>>                         SplitStream<JsonNode> eventsByType =
>>>                 env.addSource(sourceFactory.create(
>>>                                 configParams.getRequired("topic"),
>>>                                 new ObjectNodeDeserializationSchema(),
>>>                                 sourceProperties
>>>                         ))
>>>                                 .returns(ObjectNode.class) // the
>>>                 use of ConsumerFactory erases the type info so add
>>>                 it back
>>>                                 .name("raw-events")
>>>                                 .assignTimestampsAndWatermarks(
>>>                                         new
>>>                 ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
>>>                 Time.seconds(5))
>>>                                 )
>>>                                 .split(new
>>>                 JsonNodeOutputSelector("eventType"));
>>>                 ...
>>>                         eventsByType.select(...)
>>>                                 .keyBy(new
>>>                 JsonNodeStringKeySelector("_key"))
>>>                                
>>>                 .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
>>>                                         (KeySelector<JsonNode,
>>>                 Time>) TasksMain::offsetPerMaster))
>>>                                 .trigger(EventTimeTrigger.create())
>>>                                 .aggregate(new CountsAggregator<>(),
>>>                 new KeyTagger<>()) // <==== The CountsAggregator is
>>>                 seeing the data
>>>                                 .print() // <==== HERE is where we
>>>                 get no output from Kinesis... but Kafka and my Mock
>>>                 are just fine!
>>>
>>>

Re: How to debug difference between Kinesis and Kafka

Posted by Stephen Connolly <st...@gmail.com>.
On Thu, 21 Feb 2019 at 13:36, Dawid Wysakowicz <dw...@apache.org>
wrote:

> It is definitely a solution ;)
>
> You should be aware of the downsides though:
>
>    - you might get different results in case of reprocessing
>    - you might drop some data as late, due to some delays in processing,
>    if the events arrive later then the "ProcessingTime" threshold
>
> So I have a separate stream processor from the "late" side of my window
that works out what the update is.

But I guess the question I have is around what happens with reprocessing.

1. Event 1 goes into the window aggregation because it is before the
watermark

2. State gets checkpointed

3. Crash

4. Recover

Will Event 1 now go to the late stream or will it be tagged as having been
included into the state in the checkpoint.

I don't mind if Event 1 gets included in the window's "create event count
for timebox" output or the "update event count for timebox from late
events" output as long as it is always one and only one of those paths.


>
>
> Best,
>
> Dawid
> On 21/02/2019 14:18, Stephen Connolly wrote:
>
> Yes, it was the "watermarks for event time when no events for that shard"
> problem.
>
> I am now investigating whether we can use a blended watermark of
> max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to ensure
> idle shards do not cause excessive data retention.
>
> Is that the best solution?
>
> On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> Hi Stephen,
>>
>> Watermark for a single operator is the minimum of Watermarks received
>> from all inputs, therefore if one of your shards/operators does not have
>> incoming data it will not produce Watermarks thus the Watermark of
>> WindowOperator will not progress. So this is sort of an expected behavior.
>>
>> I recommend reading the docs linked by Congxian, especially this
>> section[1].
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
>> On 19/02/2019 14:31, Stephen Connolly wrote:
>>
>> Hmmm my suspicions are now quite high. I created a file source that just
>> replays the events straight then I get more results....
>>
>> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
>> stephen.alan.connolly@gmail.com> wrote:
>>
>>> Hmmm after expanding the dataset such that there was additional data
>>> that ended up on shard-0 (everything in my original dataset was
>>> coincidentally landing on shard-1) I am now getting output... should I
>>> expect this kind of behaviour if no data arrives at shard-0 ever?
>>>
>>> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
>>> stephen.alan.connolly@gmail.com> wrote:
>>>
>>>> Hi, I’m having a strange situation and I would like to know where I
>>>> should start trying to debug.
>>>>
>>>> I have set up a configurable swap in source, with three implementations:
>>>>
>>>> 1. A mock implementation
>>>> 2. A Kafka consumer implementation
>>>> 3. A Kinesis consumer implementation
>>>>
>>>> From injecting a log and no-op map function I can see that all three
>>>> sources pass through the events correctly.
>>>>
>>>> I then have a window based on event time stamps… and from inspecting
>>>> the aggregation function I can see that the data is getting aggregated…,
>>>> I’m using the `.aggregate(AggregateFunction.WindowFunction)` variant so
>>>> that I can retrieve the key
>>>>
>>>> Here’s the strange thing, I only change the source (and each source
>>>> uses the same deserialization function) but:
>>>>
>>>>
>>>>    - When I use either Kafka or my Mock source, the WindowFunction
>>>>    gets called as events pass the end of the window
>>>>    - When I use the Kinesis source, however, the window function never
>>>>    gets called. I have even tried injecting events into kinesis with really
>>>>    high timestamps to flush the watermarks in my
>>>>    BoundedOutOfOrdernessTimestampExtractor... but nothing
>>>>
>>>> I cannot see how this source switching could result in such a different
>>>> behaviour:
>>>>
>>>>         Properties sourceProperties = new Properties();
>>>>         ConsumerFactory sourceFactory;
>>>>         String sourceName = configParams.getRequired("source");
>>>>         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>>>>             case "kinesis":
>>>>                 sourceFactory = FlinkKinesisConsumer::new;
>>>>                 copyOptionalArg(configParams, "aws-region",
>>>> sourceProperties, AWSConfigConstants.AWS_REGION);
>>>>                 copyOptionalArg(configParams, "aws-endpoint",
>>>> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>>>>                 copyOptionalArg(configParams, "aws-access-key",
>>>> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>>>>                 copyOptionalArg(configParams, "aws-secret-key",
>>>> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>>>>                 copyOptionalArg(configParams, "aws-profile",
>>>> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>>>>                 break;
>>>>             case "kafka":
>>>>                 sourceFactory = FlinkKafkaConsumer010::new;
>>>>                 copyRequiredArg(configParams, "bootstrap-server",
>>>> sourceProperties, "bootstrap.servers");
>>>>                 copyOptionalArg(configParams, "group-id",
>>>> sourceProperties, "group.id");
>>>>                 break;
>>>>             case "mock":
>>>>                 sourceFactory = MockSourceFunction::new;
>>>>                 break;
>>>>             default:
>>>>                 throw new RuntimeException("Unknown source '" +
>>>> sourceName + '\'');
>>>>         }
>>>>
>>>>         // set up the streaming execution environment
>>>>         final StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>>         // poll watermark every second because using
>>>> BoundedOutOfOrdernessTimestampExtractor
>>>>         env.getConfig().setAutoWatermarkInterval(1000L);
>>>>         env.enableCheckpointing(5000);
>>>>
>>>>         SplitStream<JsonNode> eventsByType =
>>>> env.addSource(sourceFactory.create(
>>>>                 configParams.getRequired("topic"),
>>>>                 new ObjectNodeDeserializationSchema(),
>>>>                 sourceProperties
>>>>         ))
>>>>                 .returns(ObjectNode.class) // the use of
>>>> ConsumerFactory erases the type info so add it back
>>>>                 .name("raw-events")
>>>>                 .assignTimestampsAndWatermarks(
>>>>                         new
>>>> ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
>>>> Time.seconds(5))
>>>>                 )
>>>>                 .split(new JsonNodeOutputSelector("eventType"));
>>>> ...
>>>>         eventsByType.select(...)
>>>>                 .keyBy(new JsonNodeStringKeySelector("_key"))
>>>>
>>>> .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
>>>>                         (KeySelector<JsonNode, Time>)
>>>> TasksMain::offsetPerMaster))
>>>>                 .trigger(EventTimeTrigger.create())
>>>>                 .aggregate(new CountsAggregator<>(), new KeyTagger<>())
>>>> // <==== The CountsAggregator is seeing the data
>>>>                 .print() // <==== HERE is where we get no output from
>>>> Kinesis... but Kafka and my Mock are just fine!
>>>>
>>>>
>>>>

Re: How to debug difference between Kinesis and Kafka

Posted by Dawid Wysakowicz <dw...@apache.org>.
It is definitely a solution ;)

You should be aware of the downsides though:

  * you might get different results in case of reprocessing
  * you might drop some data as late, due to some delays in processing,
    if the events arrive later then the "ProcessingTime" threshold

Best,

Dawid

On 21/02/2019 14:18, Stephen Connolly wrote:
> Yes, it was the "watermarks for event time when no events for that
> shard" problem.
>
> I am now investigating whether we can use a blended watermark of
> max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to
> ensure idle shards do not cause excessive data retention.
>
> Is that the best solution?
>
> On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz <dwysakowicz@apache.org
> <ma...@apache.org>> wrote:
>
>     Hi Stephen,
>
>     Watermark for a single operator is the minimum of Watermarks
>     received from all inputs, therefore if one of your
>     shards/operators does not have incoming data it will not produce
>     Watermarks thus the Watermark of WindowOperator will not progress.
>     So this is sort of an expected behavior.
>
>     I recommend reading the docs linked by Congxian, especially this
>     section[1].
>
>     Best,
>
>     Dawid
>
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
>
>     On 19/02/2019 14:31, Stephen Connolly wrote:
>>     Hmmm my suspicions are now quite high. I created a file source
>>     that just replays the events straight then I get more results....
>>
>>     On Tue, 19 Feb 2019 at 11:50, Stephen Connolly
>>     <stephen.alan.connolly@gmail.com
>>     <ma...@gmail.com>> wrote:
>>
>>         Hmmm after expanding the dataset such that there was
>>         additional data that ended up on shard-0 (everything in my
>>         original dataset was coincidentally landing on shard-1) I am
>>         now getting output... should I expect this kind of behaviour
>>         if no data arrives at shard-0 ever?
>>
>>         On Tue, 19 Feb 2019 at 11:14, Stephen Connolly
>>         <stephen.alan.connolly@gmail.com
>>         <ma...@gmail.com>> wrote:
>>
>>             Hi, I’m having a strange situation and I would like to
>>             know where I should start trying to debug.
>>
>>             I have set up a configurable swap in source, with three
>>             implementations:
>>
>>             1. A mock implementation
>>             2. A Kafka consumer implementation
>>             3. A Kinesis consumer implementation
>>
>>             From injecting a log and no-op map function I can see
>>             that all three sources pass through the events correctly.
>>
>>             I then have a window based on event time stamps… and from
>>             inspecting the aggregation function I can see that the
>>             data is getting aggregated…, I’m using the
>>             `.aggregate(AggregateFunction.WindowFunction)` variant so
>>             that I can retrieve the key
>>
>>             Here’s the strange thing, I only change the source (and
>>             each source uses the same deserialization function) but:
>>
>>               * When I use either Kafka or my Mock source, the
>>                 WindowFunction gets called as events pass the end of
>>                 the window
>>               * When I use the Kinesis source, however, the window
>>                 function never gets called. I have even tried
>>                 injecting events into kinesis with really high
>>                 timestamps to flush the watermarks in my
>>                 BoundedOutOfOrdernessTimestampExtractor... but nothing
>>
>>             I cannot see how this source switching could result in
>>             such a different behaviour:
>>
>>                     Properties sourceProperties = new Properties();
>>                     ConsumerFactory sourceFactory;
>>                     String sourceName =
>>             configParams.getRequired("source");
>>                     switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>>                         case "kinesis":
>>                             sourceFactory = FlinkKinesisConsumer::new;
>>                             copyOptionalArg(configParams,
>>             "aws-region", sourceProperties,
>>             AWSConfigConstants.AWS_REGION);
>>                             copyOptionalArg(configParams,
>>             "aws-endpoint", sourceProperties,
>>             AWSConfigConstants.AWS_ENDPOINT);
>>                             copyOptionalArg(configParams,
>>             "aws-access-key", sourceProperties,
>>             AWSConfigConstants.AWS_ACCESS_KEY_ID);
>>                             copyOptionalArg(configParams,
>>             "aws-secret-key", sourceProperties,
>>             AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>>                             copyOptionalArg(configParams,
>>             "aws-profile", sourceProperties,
>>             AWSConfigConstants.AWS_PROFILE_NAME);
>>                             break;
>>                         case "kafka":
>>                             sourceFactory = FlinkKafkaConsumer010::new;
>>                             copyRequiredArg(configParams,
>>             "bootstrap-server", sourceProperties, "bootstrap.servers");
>>                             copyOptionalArg(configParams, "group-id",
>>             sourceProperties, "group.id <http://group.id>");
>>                             break;
>>                         case "mock":
>>                             sourceFactory = MockSourceFunction::new;
>>                             break;
>>                         default:
>>                             throw new RuntimeException("Unknown
>>             source '" + sourceName + '\'');
>>                     }
>>
>>                     // set up the streaming execution environment
>>                     final StreamExecutionEnvironment env =
>>             StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>                     // poll watermark every second because using
>>             BoundedOutOfOrdernessTimestampExtractor
>>                     env.getConfig().setAutoWatermarkInterval(1000L);
>>                     env.enableCheckpointing(5000);
>>
>>                     SplitStream<JsonNode> eventsByType =
>>             env.addSource(sourceFactory.create(
>>                             configParams.getRequired("topic"),
>>                             new ObjectNodeDeserializationSchema(),
>>                             sourceProperties
>>                     ))
>>                             .returns(ObjectNode.class) // the use of
>>             ConsumerFactory erases the type info so add it back
>>                             .name("raw-events")
>>                             .assignTimestampsAndWatermarks(
>>                                     new
>>             ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
>>             Time.seconds(5))
>>                             )
>>                             .split(new
>>             JsonNodeOutputSelector("eventType"));
>>             ...
>>                     eventsByType.select(...)
>>                             .keyBy(new JsonNodeStringKeySelector("_key"))
>>                            
>>             .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
>>                                     (KeySelector<JsonNode, Time>)
>>             TasksMain::offsetPerMaster))
>>                             .trigger(EventTimeTrigger.create())
>>                             .aggregate(new CountsAggregator<>(), new
>>             KeyTagger<>()) // <==== The CountsAggregator is seeing
>>             the data
>>                             .print() // <==== HERE is where we get no
>>             output from Kinesis... but Kafka and my Mock are just fine!
>>
>>

Re: How to debug difference between Kinesis and Kafka

Posted by Stephen Connolly <st...@gmail.com>.
Yes, it was the "watermarks for event time when no events for that shard"
problem.

I am now investigating whether we can use a blended watermark of
max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to ensure
idle shards do not cause excessive data retention.

Is that the best solution?

On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hi Stephen,
>
> Watermark for a single operator is the minimum of Watermarks received from
> all inputs, therefore if one of your shards/operators does not have
> incoming data it will not produce Watermarks thus the Watermark of
> WindowOperator will not progress. So this is sort of an expected behavior.
>
> I recommend reading the docs linked by Congxian, especially this
> section[1].
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
> On 19/02/2019 14:31, Stephen Connolly wrote:
>
> Hmmm my suspicions are now quite high. I created a file source that just
> replays the events straight then I get more results....
>
> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
> stephen.alan.connolly@gmail.com> wrote:
>
>> Hmmm after expanding the dataset such that there was additional data that
>> ended up on shard-0 (everything in my original dataset was coincidentally
>> landing on shard-1) I am now getting output... should I expect this kind of
>> behaviour if no data arrives at shard-0 ever?
>>
>> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
>> stephen.alan.connolly@gmail.com> wrote:
>>
>>> Hi, I’m having a strange situation and I would like to know where I
>>> should start trying to debug.
>>>
>>> I have set up a configurable swap in source, with three implementations:
>>>
>>> 1. A mock implementation
>>> 2. A Kafka consumer implementation
>>> 3. A Kinesis consumer implementation
>>>
>>> From injecting a log and no-op map function I can see that all three
>>> sources pass through the events correctly.
>>>
>>> I then have a window based on event time stamps… and from inspecting the
>>> aggregation function I can see that the data is getting aggregated…, I’m
>>> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
>>> can retrieve the key
>>>
>>> Here’s the strange thing, I only change the source (and each source uses
>>> the same deserialization function) but:
>>>
>>>
>>>    - When I use either Kafka or my Mock source, the WindowFunction gets
>>>    called as events pass the end of the window
>>>    - When I use the Kinesis source, however, the window function never
>>>    gets called. I have even tried injecting events into kinesis with really
>>>    high timestamps to flush the watermarks in my
>>>    BoundedOutOfOrdernessTimestampExtractor... but nothing
>>>
>>> I cannot see how this source switching could result in such a different
>>> behaviour:
>>>
>>>         Properties sourceProperties = new Properties();
>>>         ConsumerFactory sourceFactory;
>>>         String sourceName = configParams.getRequired("source");
>>>         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>>>             case "kinesis":
>>>                 sourceFactory = FlinkKinesisConsumer::new;
>>>                 copyOptionalArg(configParams, "aws-region",
>>> sourceProperties, AWSConfigConstants.AWS_REGION);
>>>                 copyOptionalArg(configParams, "aws-endpoint",
>>> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>>>                 copyOptionalArg(configParams, "aws-access-key",
>>> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>>>                 copyOptionalArg(configParams, "aws-secret-key",
>>> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>>>                 copyOptionalArg(configParams, "aws-profile",
>>> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>>>                 break;
>>>             case "kafka":
>>>                 sourceFactory = FlinkKafkaConsumer010::new;
>>>                 copyRequiredArg(configParams, "bootstrap-server",
>>> sourceProperties, "bootstrap.servers");
>>>                 copyOptionalArg(configParams, "group-id",
>>> sourceProperties, "group.id");
>>>                 break;
>>>             case "mock":
>>>                 sourceFactory = MockSourceFunction::new;
>>>                 break;
>>>             default:
>>>                 throw new RuntimeException("Unknown source '" +
>>> sourceName + '\'');
>>>         }
>>>
>>>         // set up the streaming execution environment
>>>         final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>>         // poll watermark every second because using
>>> BoundedOutOfOrdernessTimestampExtractor
>>>         env.getConfig().setAutoWatermarkInterval(1000L);
>>>         env.enableCheckpointing(5000);
>>>
>>>         SplitStream<JsonNode> eventsByType =
>>> env.addSource(sourceFactory.create(
>>>                 configParams.getRequired("topic"),
>>>                 new ObjectNodeDeserializationSchema(),
>>>                 sourceProperties
>>>         ))
>>>                 .returns(ObjectNode.class) // the use of ConsumerFactory
>>> erases the type info so add it back
>>>                 .name("raw-events")
>>>                 .assignTimestampsAndWatermarks(
>>>                         new
>>> ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
>>> Time.seconds(5))
>>>                 )
>>>                 .split(new JsonNodeOutputSelector("eventType"));
>>> ...
>>>         eventsByType.select(...)
>>>                 .keyBy(new JsonNodeStringKeySelector("_key"))
>>>
>>> .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
>>>                         (KeySelector<JsonNode, Time>)
>>> TasksMain::offsetPerMaster))
>>>                 .trigger(EventTimeTrigger.create())
>>>                 .aggregate(new CountsAggregator<>(), new KeyTagger<>())
>>> // <==== The CountsAggregator is seeing the data
>>>                 .print() // <==== HERE is where we get no output from
>>> Kinesis... but Kafka and my Mock are just fine!
>>>
>>>
>>>

Re: How to debug difference between Kinesis and Kafka

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Stephen,

Watermark for a single operator is the minimum of Watermarks received
from all inputs, therefore if one of your shards/operators does not have
incoming data it will not produce Watermarks thus the Watermark of
WindowOperator will not progress. So this is sort of an expected behavior.

I recommend reading the docs linked by Congxian, especially this section[1].

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams

On 19/02/2019 14:31, Stephen Connolly wrote:
> Hmmm my suspicions are now quite high. I created a file source that
> just replays the events straight then I get more results....
>
> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly
> <stephen.alan.connolly@gmail.com
> <ma...@gmail.com>> wrote:
>
>     Hmmm after expanding the dataset such that there was additional
>     data that ended up on shard-0 (everything in my original dataset
>     was coincidentally landing on shard-1) I am now getting output...
>     should I expect this kind of behaviour if no data arrives at
>     shard-0 ever?
>
>     On Tue, 19 Feb 2019 at 11:14, Stephen Connolly
>     <stephen.alan.connolly@gmail.com
>     <ma...@gmail.com>> wrote:
>
>         Hi, I’m having a strange situation and I would like to know
>         where I should start trying to debug.
>
>         I have set up a configurable swap in source, with three
>         implementations:
>
>         1. A mock implementation
>         2. A Kafka consumer implementation
>         3. A Kinesis consumer implementation
>
>         From injecting a log and no-op map function I can see that all
>         three sources pass through the events correctly.
>
>         I then have a window based on event time stamps… and from
>         inspecting the aggregation function I can see that the data is
>         getting aggregated…, I’m using the
>         `.aggregate(AggregateFunction.WindowFunction)` variant so that
>         I can retrieve the key
>
>         Here’s the strange thing, I only change the source (and each
>         source uses the same deserialization function) but:
>
>           * When I use either Kafka or my Mock source, the
>             WindowFunction gets called as events pass the end of the
>             window
>           * When I use the Kinesis source, however, the window
>             function never gets called. I have even tried injecting
>             events into kinesis with really high timestamps to flush
>             the watermarks in my
>             BoundedOutOfOrdernessTimestampExtractor... but nothing
>
>         I cannot see how this source switching could result in such a
>         different behaviour:
>
>                 Properties sourceProperties = new Properties();
>                 ConsumerFactory sourceFactory;
>                 String sourceName = configParams.getRequired("source");
>                 switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>                     case "kinesis":
>                         sourceFactory = FlinkKinesisConsumer::new;
>                         copyOptionalArg(configParams, "aws-region",
>         sourceProperties, AWSConfigConstants.AWS_REGION);
>                         copyOptionalArg(configParams, "aws-endpoint",
>         sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>                         copyOptionalArg(configParams,
>         "aws-access-key", sourceProperties,
>         AWSConfigConstants.AWS_ACCESS_KEY_ID);
>                         copyOptionalArg(configParams,
>         "aws-secret-key", sourceProperties,
>         AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>                         copyOptionalArg(configParams, "aws-profile",
>         sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>                         break;
>                     case "kafka":
>                         sourceFactory = FlinkKafkaConsumer010::new;
>                         copyRequiredArg(configParams,
>         "bootstrap-server", sourceProperties, "bootstrap.servers");
>                         copyOptionalArg(configParams, "group-id",
>         sourceProperties, "group.id <http://group.id>");
>                         break;
>                     case "mock":
>                         sourceFactory = MockSourceFunction::new;
>                         break;
>                     default:
>                         throw new RuntimeException("Unknown source '"
>         + sourceName + '\'');
>                 }
>
>                 // set up the streaming execution environment
>                 final StreamExecutionEnvironment env =
>         StreamExecutionEnvironment.getExecutionEnvironment();
>
>                 // poll watermark every second because using
>         BoundedOutOfOrdernessTimestampExtractor
>                 env.getConfig().setAutoWatermarkInterval(1000L);
>                 env.enableCheckpointing(5000);
>
>                 SplitStream<JsonNode> eventsByType =
>         env.addSource(sourceFactory.create(
>                         configParams.getRequired("topic"),
>                         new ObjectNodeDeserializationSchema(),
>                         sourceProperties
>                 ))
>                         .returns(ObjectNode.class) // the use of
>         ConsumerFactory erases the type info so add it back
>                         .name("raw-events")
>                         .assignTimestampsAndWatermarks(
>                                 new
>         ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
>         Time.seconds(5))
>                         )
>                         .split(new JsonNodeOutputSelector("eventType"));
>         ...
>                 eventsByType.select(...)
>                         .keyBy(new JsonNodeStringKeySelector("_key"))
>                        
>         .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
>                                 (KeySelector<JsonNode, Time>)
>         TasksMain::offsetPerMaster))
>                         .trigger(EventTimeTrigger.create())
>                         .aggregate(new CountsAggregator<>(), new
>         KeyTagger<>()) // <==== The CountsAggregator is seeing the data
>                         .print() // <==== HERE is where we get no
>         output from Kinesis... but Kafka and my Mock are just fine!
>
>

Re: How to debug difference between Kinesis and Kafka

Posted by Stephen Connolly <st...@gmail.com>.
Hmmm my suspicions are now quite high. I created a file source that just
replays the events straight then I get more results....

On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
stephen.alan.connolly@gmail.com> wrote:

> Hmmm after expanding the dataset such that there was additional data that
> ended up on shard-0 (everything in my original dataset was coincidentally
> landing on shard-1) I am now getting output... should I expect this kind of
> behaviour if no data arrives at shard-0 ever?
>
> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
> stephen.alan.connolly@gmail.com> wrote:
>
>> Hi, I’m having a strange situation and I would like to know where I
>> should start trying to debug.
>>
>> I have set up a configurable swap in source, with three implementations:
>>
>> 1. A mock implementation
>> 2. A Kafka consumer implementation
>> 3. A Kinesis consumer implementation
>>
>> From injecting a log and no-op map function I can see that all three
>> sources pass through the events correctly.
>>
>> I then have a window based on event time stamps… and from inspecting the
>> aggregation function I can see that the data is getting aggregated…, I’m
>> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
>> can retrieve the key
>>
>> Here’s the strange thing, I only change the source (and each source uses
>> the same deserialization function) but:
>>
>>
>>    - When I use either Kafka or my Mock source, the WindowFunction gets
>>    called as events pass the end of the window
>>    - When I use the Kinesis source, however, the window function never
>>    gets called. I have even tried injecting events into kinesis with really
>>    high timestamps to flush the watermarks in my
>>    BoundedOutOfOrdernessTimestampExtractor... but nothing
>>
>> I cannot see how this source switching could result in such a different
>> behaviour:
>>
>>         Properties sourceProperties = new Properties();
>>         ConsumerFactory sourceFactory;
>>         String sourceName = configParams.getRequired("source");
>>         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>>             case "kinesis":
>>                 sourceFactory = FlinkKinesisConsumer::new;
>>                 copyOptionalArg(configParams, "aws-region",
>> sourceProperties, AWSConfigConstants.AWS_REGION);
>>                 copyOptionalArg(configParams, "aws-endpoint",
>> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>>                 copyOptionalArg(configParams, "aws-access-key",
>> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>>                 copyOptionalArg(configParams, "aws-secret-key",
>> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>>                 copyOptionalArg(configParams, "aws-profile",
>> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>>                 break;
>>             case "kafka":
>>                 sourceFactory = FlinkKafkaConsumer010::new;
>>                 copyRequiredArg(configParams, "bootstrap-server",
>> sourceProperties, "bootstrap.servers");
>>                 copyOptionalArg(configParams, "group-id",
>> sourceProperties, "group.id");
>>                 break;
>>             case "mock":
>>                 sourceFactory = MockSourceFunction::new;
>>                 break;
>>             default:
>>                 throw new RuntimeException("Unknown source '" +
>> sourceName + '\'');
>>         }
>>
>>         // set up the streaming execution environment
>>         final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>         // poll watermark every second because using
>> BoundedOutOfOrdernessTimestampExtractor
>>         env.getConfig().setAutoWatermarkInterval(1000L);
>>         env.enableCheckpointing(5000);
>>
>>         SplitStream<JsonNode> eventsByType =
>> env.addSource(sourceFactory.create(
>>                 configParams.getRequired("topic"),
>>                 new ObjectNodeDeserializationSchema(),
>>                 sourceProperties
>>         ))
>>                 .returns(ObjectNode.class) // the use of ConsumerFactory
>> erases the type info so add it back
>>                 .name("raw-events")
>>                 .assignTimestampsAndWatermarks(
>>                         new
>> ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
>> Time.seconds(5))
>>                 )
>>                 .split(new JsonNodeOutputSelector("eventType"));
>> ...
>>         eventsByType.select(...)
>>                 .keyBy(new JsonNodeStringKeySelector("_key"))
>>
>> .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
>>                         (KeySelector<JsonNode, Time>)
>> TasksMain::offsetPerMaster))
>>                 .trigger(EventTimeTrigger.create())
>>                 .aggregate(new CountsAggregator<>(), new KeyTagger<>())
>> // <==== The CountsAggregator is seeing the data
>>                 .print() // <==== HERE is where we get no output from
>> Kinesis... but Kafka and my Mock are just fine!
>>
>>
>>

Re: How to debug difference between Kinesis and Kafka

Posted by Stephen Connolly <st...@gmail.com>.
Hmmm after expanding the dataset such that there was additional data that
ended up on shard-0 (everything in my original dataset was coincidentally
landing on shard-1) I am now getting output... should I expect this kind of
behaviour if no data arrives at shard-0 ever?

On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
stephen.alan.connolly@gmail.com> wrote:

> Hi, I’m having a strange situation and I would like to know where I should
> start trying to debug.
>
> I have set up a configurable swap in source, with three implementations:
>
> 1. A mock implementation
> 2. A Kafka consumer implementation
> 3. A Kinesis consumer implementation
>
> From injecting a log and no-op map function I can see that all three
> sources pass through the events correctly.
>
> I then have a window based on event time stamps… and from inspecting the
> aggregation function I can see that the data is getting aggregated…, I’m
> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
> can retrieve the key
>
> Here’s the strange thing, I only change the source (and each source uses
> the same deserialization function) but:
>
>
>    - When I use either Kafka or my Mock source, the WindowFunction gets
>    called as events pass the end of the window
>    - When I use the Kinesis source, however, the window function never
>    gets called. I have even tried injecting events into kinesis with really
>    high timestamps to flush the watermarks in my
>    BoundedOutOfOrdernessTimestampExtractor... but nothing
>
> I cannot see how this source switching could result in such a different
> behaviour:
>
>         Properties sourceProperties = new Properties();
>         ConsumerFactory sourceFactory;
>         String sourceName = configParams.getRequired("source");
>         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>             case "kinesis":
>                 sourceFactory = FlinkKinesisConsumer::new;
>                 copyOptionalArg(configParams, "aws-region",
> sourceProperties, AWSConfigConstants.AWS_REGION);
>                 copyOptionalArg(configParams, "aws-endpoint",
> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>                 copyOptionalArg(configParams, "aws-access-key",
> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>                 copyOptionalArg(configParams, "aws-secret-key",
> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>                 copyOptionalArg(configParams, "aws-profile",
> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>                 break;
>             case "kafka":
>                 sourceFactory = FlinkKafkaConsumer010::new;
>                 copyRequiredArg(configParams, "bootstrap-server",
> sourceProperties, "bootstrap.servers");
>                 copyOptionalArg(configParams, "group-id",
> sourceProperties, "group.id");
>                 break;
>             case "mock":
>                 sourceFactory = MockSourceFunction::new;
>                 break;
>             default:
>                 throw new RuntimeException("Unknown source '" + sourceName
> + '\'');
>         }
>
>         // set up the streaming execution environment
>         final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         // poll watermark every second because using
> BoundedOutOfOrdernessTimestampExtractor
>         env.getConfig().setAutoWatermarkInterval(1000L);
>         env.enableCheckpointing(5000);
>
>         SplitStream<JsonNode> eventsByType =
> env.addSource(sourceFactory.create(
>                 configParams.getRequired("topic"),
>                 new ObjectNodeDeserializationSchema(),
>                 sourceProperties
>         ))
>                 .returns(ObjectNode.class) // the use of ConsumerFactory
> erases the type info so add it back
>                 .name("raw-events")
>                 .assignTimestampsAndWatermarks(
>                         new
> ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
> Time.seconds(5))
>                 )
>                 .split(new JsonNodeOutputSelector("eventType"));
> ...
>         eventsByType.select(...)
>                 .keyBy(new JsonNodeStringKeySelector("_key"))
>
> .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
>                         (KeySelector<JsonNode, Time>)
> TasksMain::offsetPerMaster))
>                 .trigger(EventTimeTrigger.create())
>                 .aggregate(new CountsAggregator<>(), new KeyTagger<>()) //
> <==== The CountsAggregator is seeing the data
>                 .print() // <==== HERE is where we get no output from
> Kinesis... but Kafka and my Mock are just fine!
>
>
>