You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sara Arshad <sa...@gmail.com> on 2020/05/19 21:28:34 UTC

Question about My Flink Application

Hi,

I have been using Flink with kinesis analytics.
I have a stream of data and also I need a cache which I update every 300
seconds.
To share the cache data with the kinesis stream elements, I used a
broadcast stream as I implemented a SourceFunction which gets the data from
DB and broadcast it to the next stream which is
KeyedBroadcastProcessFuction.
But after adding the broadcast stream (in the previous version I hadn't
a cache and I was using KeyedProcessFuction for kinesis stream), when I
execute it in kinesis analytics, it keeps restarting about every 20
minutes.
Could you please help me that what could be the issue?

Best regards,
Sara Arshad

Re: Question about My Flink Application

Posted by Sara Arshad <sa...@gmail.com>.
Hi Alexander,

Thank you for your reply.
I got a reply from AWS people. Seems like it's a configuration problem.
But, even if it works fine without restarting, it's not a good option for
us.
There is no one-to-one relation between cache data and keyed values.
Therefore, It has to send the whole data to every key every 5 minutes and
we may have a very large number of keys at the same time.
So I came up with a completely different solution. Now, I only have the
cache in a shared MAP. Maybe, It is not that much good design-wise but it
has higher performance.

Best regards,
Sara



On Sat, May 23, 2020 at 1:04 PM Alexander Fedulov <al...@ververica.com>
wrote:

> Returning the discussion to the mailing list ( it accidentally went to a
> side channel because of a direct reply).
> What I was referring to, is the event-time processing semantic, which is
> based on the watermarks mechanism [1].
> If you are using it, the event time at your KeyedBroadcastProcessFuction
> will be determined as a minimum value of the maximum watermarks observed
> across all of the input channels. In order not to stall the processing of
> the events of the main data flow by the control channel (broadcast stream),
> you could set it's watermark to the maximum possible value, as shown in
> this example [2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
> [2]
> https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/src/main/java/com/ververica/field/dynamicrules/sources/RulesSource.java#L80
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
>
>
> On Sat, May 23, 2020 at 1:05 AM Sara Arshad <sa...@gmail.com>
> wrote:
>
>> - It was based on something I read about the broadcast.
>> Besides, as I mentioned before, the restart happens when it's triggering
>> checkpoints.
>> - When I send the streams it processes it perfectly fine between restarts.
>> - Yes, I am using ProcessingTimeService in the cache source to make it
>> get data every 300 seconds.
>> Do you have any views on should it be doable with a stream of a million
>> messages, In case I improve my implementation?
>>
>> Best regards,
>> Sara
>>
>> On Fri, May 22, 2020 at 6:22 PM Alexander Fedulov <
>> alexander@ververica.com> wrote:
>>
>>> OK, with such data sizes this should definitely be doable with a
>>> broadcast channel.
>>> "The problem was that the broadcast puts a lot of pressure on
>>> checkpointing." - is this the evaluation of the AWS support? Do you have
>>> any details as to why this is considered to be the case?
>>> "Even before I start to send the Kinesis stream it stuck." - so do you
>>> actually see any data output or nothing is happening and 20 minutes later
>>> the job crashes?
>>> Are you using event time processing semantics in your pipeline?
>>>
>>> --
>>>
>>> Alexander Fedulov | Solutions Architect
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> On Fri, May 22, 2020 at 4:34 PM Sara Arshad <sa...@gmail.com>
>>> wrote:
>>>
>>>> Hi Alexander,
>>>>
>>>> It's not that much data. I have only 2 records in my dynamodb right now
>>>> (later it can be around 100 records. it's not that much) and I update
>>>> the whole data every 300 seconds.
>>>> Even before I start to send the Kinesis stream it stuck.
>>>> Yes, I can see the checkpoint size is around 150k. But in some cases
>>>> when I sent Kinesis Stream of 80 messages it's around 190k.
>>>> The maximum checkpoint duration is 670.
>>>>
>>>> Regards,
>>>>
>>>>
>>>> On Fri, 22 May 2020, 4:15 pm Alexander Fedulov, <
>>>> alexander@ververica.com> wrote:
>>>>
>>>>> Hi Sara,
>>>>>
>>>>> what is the volume of data that is coming in through the broadcast
>>>>> channel every 30 seconds? Do you only insert modified rules entries or all
>>>>> of them on each update?
>>>>> Do you have access to metrics? Specifically, the size of the
>>>>> checkpoints and time distribution of different checkpoint phases are of
>>>>> interest.
>>>>>
>>>>> Best,
>>>>>
>>>>> --
>>>>>
>>>>> Alexander Fedulov | Solutions Architect
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> On Fri, May 22, 2020 at 3:57 PM Sara Arshad <sa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> The problem was that the broadcast puts a lot of pressure on
>>>>>> checkpointing.
>>>>>> I have to find another solution.
>>>>>> If you have any other solution please let me know.
>>>>>>
>>>>>> Regards,
>>>>>> Sara
>>>>>>
>>>>>> On Wed, 20 May 2020, 5:55 pm Sara Arshad, <sa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> That was the broadcast stream. Which is supposed to behave like a
>>>>>>> cache.
>>>>>>> Then I connect that one to the kinesis stream like the below code.
>>>>>>> Also, I have two Sink functions to send the results to another
>>>>>>> dynamoDb table or cloud watch based on the output type.
>>>>>>> Is this make sense or do you have another idea?
>>>>>>>
>>>>>>> DataStreamSource<MyRule> ruleDataStreamSource = env.addSource(new DynamoDBSource()).setParallelism(1);
>>>>>>>
>>>>>>> MapStateDescriptor<String, MyRule> ruleStateDescriptor = new MapStateDescriptor<>(
>>>>>>>         "RulesBroadcastState",
>>>>>>>         BasicTypeInfo.STRING_TYPE_INFO,
>>>>>>>         TypeInformation.of(new TypeHint<MyRule>() {
>>>>>>>         }));
>>>>>>>
>>>>>>> BroadcastStream<MyRule> ruleBroadcastStream = ruleDataStreamSource
>>>>>>>         .broadcast(ruleStateDescriptor);
>>>>>>>
>>>>>>> SingleOutputStreamOperator<WindowFunctionOutput> process = env.addSource(ObjectFactory.getKinesisConsumer())
>>>>>>>         .keyBy(new KeySelector<Message, Tuple2<Identifier, Integer>>() {
>>>>>>>
>>>>>>>             @Override
>>>>>>>             public Tuple2<Identifier, Integer> getKey(Message value) throws Exception {
>>>>>>>                 return Tuple2.of(value.getIdentifier(), value.getServiceType());
>>>>>>>             }
>>>>>>>
>>>>>>>         })
>>>>>>>         .connect(ruleBroadcastStream)
>>>>>>>         .process(new BroadcastWindowFunction());
>>>>>>>
>>>>>>> DataStreamSink<OutputX> blockingStrategyOutputDataStreamSink = process
>>>>>>>         .filter(output -> OutputX.class.isAssignableFrom(output.getClass()))
>>>>>>>         .map(output -> (OutputX) output)
>>>>>>>         .addSink(new DynamoDBSink());
>>>>>>>
>>>>>>>
>>>>>>> DataStreamSink<OutputY> metricsOutputDataStreamSink = process
>>>>>>>         .filter(output -> OutputY.class.isAssignableFrom(output.getClass()))
>>>>>>>         .map(output -> (OutputY) output)
>>>>>>>         .addSink(new CloudWatchMetricsSink());
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 20, 2020 at 5:45 PM Alexander Fedulov <
>>>>>>> alexander@ververica.com> wrote:
>>>>>>>
>>>>>>>> I did not notice that you are actually running the KinesisAnalytics
>>>>>>>> job, without access to the machines, sorry. In this case, without any
>>>>>>>> errors in the logs, I think there is not much that we can do without the
>>>>>>>> AWS support team looking into it.
>>>>>>>> Nonetheless, one thing I was wondering about is whether you
>>>>>>>> necessarily need to have a custom DynamoDBSource to fetch rules
>>>>>>>> periodically. How about directly connecting to the steam of DynamoDB
>>>>>>>> updates and getting everything in the real time [1]
>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-4582> ? This would
>>>>>>>> remove one moving part that, as I see, you suspect to be a potential source
>>>>>>>> of errors.
>>>>>>>>
>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-4582
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Alexander Fedulov | Solutions Architect
>>>>>>>>
>>>>>>>>
>>>>>>>> <https://www.ververica.com/>
>>>>>>>>
>>>>>>>> Follow us @VervericaData
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>>>> Conference
>>>>>>>>
>>>>>>>> On Wed, May 20, 2020 at 4:38 PM Sara Arshad <
>>>>>>>> sara.arshad.86@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I am using KinesisAnalytics and cloud watch dashboard for logs.
>>>>>>>>> There is no error level log.
>>>>>>>>> It's doing stuff like 'Triggering checkpoint 187 @ 1589984558975
>>>>>>>>> for job 7998aca8913b33a090bc5c0f43168bd5.' then suddenly it is
>>>>>>>>> restarting.
>>>>>>>>> I know these are very general but I really don't know what's going
>>>>>>>>> on.
>>>>>>>>> I also asked AWS support. They haven't replied yet.
>>>>>>>>>
>>>>>>>>> This is my broadcast stream source:
>>>>>>>>>
>>>>>>>>> public class DynamoDBSource extends
>>>>>>>>> RichParallelSourceFunction<MyRule> implements CheckpointedFunction,
>>>>>>>>> ProcessingTimeCallback {
>>>>>>>>>
>>>>>>>>>     private static final long serialVersionUID = 1L;
>>>>>>>>>
>>>>>>>>>     private volatile boolean isRunning = true;
>>>>>>>>>
>>>>>>>>>     ListStateDescriptor<MyRule> ruleStateDescriptor = new
>>>>>>>>> ListStateDescriptor<>(
>>>>>>>>>             "RulesBroadcastState",
>>>>>>>>>             TypeInformation.of(new TypeHint<MyRule>() {
>>>>>>>>>             }));
>>>>>>>>>
>>>>>>>>>     private volatile Boolean sendData = true;
>>>>>>>>>     private transient ListState<MyRule> listState;
>>>>>>>>>
>>>>>>>>>     private transient ProcessingTimeServiceInf
>>>>>>>>> processingTimeService;
>>>>>>>>>
>>>>>>>>>     private static long rulesUpdateIntervalMillis;
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void open(Configuration parameters) throws Exception {
>>>>>>>>>
>>>>>>>>>         //...
>>>>>>>>>
>>>>>>>>>         processingTimeService = new ProcessingTimeServiceImpl();
>>>>>>>>>
>>>>>>>>>         long currentProcessingTime =
>>>>>>>>> processingTimeService.getCurrentProcessingTime();
>>>>>>>>>
>>>>>>>>>         rulesUpdateIntervalMillis = Some static value from config
>>>>>>>>> class
>>>>>>>>>
>>>>>>>>>         processingTimeService.registerTimer(currentProcessingTime,
>>>>>>>>> this);
>>>>>>>>>
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void snapshotState(FunctionSnapshotContext context)
>>>>>>>>> throws Exception {
>>>>>>>>>
>>>>>>>>>         Preconditions.checkState(this.listState != null,
>>>>>>>>>                 "The " + getClass().getSimpleName() + " has not
>>>>>>>>> been properly initialized.");
>>>>>>>>>
>>>>>>>>>         this.listState.clear();
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void initializeState(FunctionInitializationContext
>>>>>>>>> context) throws Exception {
>>>>>>>>>
>>>>>>>>>         Preconditions.checkState(this.listState == null,
>>>>>>>>>                 "The " + getClass().getSimpleName() + " has
>>>>>>>>> already been initialized.");
>>>>>>>>>
>>>>>>>>>         this.listState =
>>>>>>>>> context.getOperatorStateStore().getListState(ruleStateDescriptor);
>>>>>>>>>
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void run(SourceContext<BlockingRule> ctx) throws
>>>>>>>>> Exception {
>>>>>>>>>
>>>>>>>>>         while (isRunning) {
>>>>>>>>>
>>>>>>>>>             synchronized (sendData) {
>>>>>>>>>
>>>>>>>>>                 if (sendData) {
>>>>>>>>>
>>>>>>>>>                     for (MyRule rule : listState.get()) {
>>>>>>>>>
>>>>>>>>>                         ctx.collect(rule);
>>>>>>>>>                     }
>>>>>>>>>                     sendData = false;
>>>>>>>>>                 }
>>>>>>>>>             }
>>>>>>>>>         }
>>>>>>>>>
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void cancel() {
>>>>>>>>>         this.isRunning = false;
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void onProcessingTime(long timestamp) throws Exception {
>>>>>>>>>
>>>>>>>>>         readRulesFromDB();
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> processingTimeService.registerTimer(processingTimeService.getCurrentProcessingTime()
>>>>>>>>> + rulesUpdateIntervalMillis, this);
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     private synchronized void readRulesFromDB() {
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>          ...
>>>>>>>>>         this.sendData = true;
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, May 20, 2020 at 4:10 PM Alexander Fedulov <
>>>>>>>>> alexander@ververica.com> wrote:
>>>>>>>>>
>>>>>>>>>> We'd need more details to localize the problem. What are the last
>>>>>>>>>> things printed before the restart? Are there any actual error-level logs
>>>>>>>>>> there? Do you happen to find any JVM crash files
>>>>>>>>>> (hs_err_pidXXXX.log) on your Flink machines?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Alexander Fedulov | Solutions Architect
>>>>>>>>>>
>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>
>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache
>>>>>>>>>> Flink Conference
>>>>>>>>>>
>>>>>>>>>> On Wed, May 20, 2020 at 4:01 PM Sara Arshad <
>>>>>>>>>> sara.arshad.86@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thank you for your response.
>>>>>>>>>>> I get
>>>>>>>>>>> Error when creating PropertyDescriptor for public final void
>>>>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty...
>>>>>>>>>>> quite a lot. But it's Info and also I had it before broadcast.
>>>>>>>>>>> I also retried an older version of my application and it still
>>>>>>>>>>> works fine.
>>>>>>>>>>> By the way, the scenario works fine between restarts.
>>>>>>>>>>>
>>>>>>>>>>> Best regards,
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 20, 2020 at 3:44 PM Alexander Fedulov <
>>>>>>>>>>> alexander@ververica.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Sara,
>>>>>>>>>>>>
>>>>>>>>>>>> do you have logs? Any exceptions in them?
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> Alexander Fedulov | Solutions Architect
>>>>>>>>>>>>
>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>
>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache
>>>>>>>>>>>> Flink Conference
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, May 19, 2020 at 11:28 PM Sara Arshad <
>>>>>>>>>>>> sara.arshad.86@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have been using Flink with kinesis analytics.
>>>>>>>>>>>>> I have a stream of data and also I need a cache which I update
>>>>>>>>>>>>> every 300 seconds.
>>>>>>>>>>>>> To share the cache data with the kinesis stream elements, I
>>>>>>>>>>>>> used a broadcast stream as I implemented a SourceFunction which gets the
>>>>>>>>>>>>> data from DB and broadcast it to the next stream which is
>>>>>>>>>>>>> KeyedBroadcastProcessFuction.
>>>>>>>>>>>>> But after adding the broadcast stream (in the previous version
>>>>>>>>>>>>> I hadn't
>>>>>>>>>>>>> a cache and I was using KeyedProcessFuction for kinesis
>>>>>>>>>>>>> stream), when I execute it in kinesis analytics, it keeps restarting about
>>>>>>>>>>>>> every 20 minutes.
>>>>>>>>>>>>> Could you please help me that what could be the issue?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>> Sara Arshad
>>>>>>>>>>>>>
>>>>>>>>>>>>
>
> On Wed, May 20, 2020 at 3:44 PM Alexander Fedulov <al...@ververica.com>
> wrote:
>
>> Hi Sara,
>>
>> do you have logs? Any exceptions in them?
>>
>> Best,
>>
>> --
>>
>> Alexander Fedulov | Solutions Architect
>>
>> +49 1514 6265796
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>>
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Tony) Cheng
>>
>>
>>
>> On Tue, May 19, 2020 at 11:28 PM Sara Arshad <sa...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have been using Flink with kinesis analytics.
>>> I have a stream of data and also I need a cache which I update every 300
>>> seconds.
>>> To share the cache data with the kinesis stream elements, I used a
>>> broadcast stream as I implemented a SourceFunction which gets the data from
>>> DB and broadcast it to the next stream which is
>>> KeyedBroadcastProcessFuction.
>>> But after adding the broadcast stream (in the previous version I hadn't
>>> a cache and I was using KeyedProcessFuction for kinesis stream), when I
>>> execute it in kinesis analytics, it keeps restarting about every 20
>>> minutes.
>>> Could you please help me that what could be the issue?
>>>
>>> Best regards,
>>> Sara Arshad
>>>
>>

Re: Question about My Flink Application

Posted by Alexander Fedulov <al...@ververica.com>.
Returning the discussion to the mailing list ( it accidentally went to a
side channel because of a direct reply).
What I was referring to, is the event-time processing semantic, which is
based on the watermarks mechanism [1].
If you are using it, the event time at your KeyedBroadcastProcessFuction
will be determined as a minimum value of the maximum watermarks observed
across all of the input channels. In order not to stall the processing of
the events of the main data flow by the control channel (broadcast stream),
you could set it's watermark to the maximum possible value, as shown in
this example [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
[2]
https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/src/main/java/com/ververica/field/dynamicrules/sources/RulesSource.java#L80

--

Alexander Fedulov | Solutions Architect

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference



On Sat, May 23, 2020 at 1:05 AM Sara Arshad <sa...@gmail.com>
wrote:

> - It was based on something I read about the broadcast.
> Besides, as I mentioned before, the restart happens when it's triggering
> checkpoints.
> - When I send the streams it processes it perfectly fine between restarts.
> - Yes, I am using ProcessingTimeService in the cache source to make it
> get data every 300 seconds.
> Do you have any views on should it be doable with a stream of a million
> messages, In case I improve my implementation?
>
> Best regards,
> Sara
>
> On Fri, May 22, 2020 at 6:22 PM Alexander Fedulov <al...@ververica.com>
> wrote:
>
>> OK, with such data sizes this should definitely be doable with a
>> broadcast channel.
>> "The problem was that the broadcast puts a lot of pressure on
>> checkpointing." - is this the evaluation of the AWS support? Do you have
>> any details as to why this is considered to be the case?
>> "Even before I start to send the Kinesis stream it stuck." - so do you
>> actually see any data output or nothing is happening and 20 minutes later
>> the job crashes?
>> Are you using event time processing semantics in your pipeline?
>>
>> --
>>
>> Alexander Fedulov | Solutions Architect
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> On Fri, May 22, 2020 at 4:34 PM Sara Arshad <sa...@gmail.com>
>> wrote:
>>
>>> Hi Alexander,
>>>
>>> It's not that much data. I have only 2 records in my dynamodb right now
>>> (later it can be around 100 records. it's not that much) and I update
>>> the whole data every 300 seconds.
>>> Even before I start to send the Kinesis stream it stuck.
>>> Yes, I can see the checkpoint size is around 150k. But in some cases
>>> when I sent Kinesis Stream of 80 messages it's around 190k.
>>> The maximum checkpoint duration is 670.
>>>
>>> Regards,
>>>
>>>
>>> On Fri, 22 May 2020, 4:15 pm Alexander Fedulov, <al...@ververica.com>
>>> wrote:
>>>
>>>> Hi Sara,
>>>>
>>>> what is the volume of data that is coming in through the broadcast
>>>> channel every 30 seconds? Do you only insert modified rules entries or all
>>>> of them on each update?
>>>> Do you have access to metrics? Specifically, the size of the
>>>> checkpoints and time distribution of different checkpoint phases are of
>>>> interest.
>>>>
>>>> Best,
>>>>
>>>> --
>>>>
>>>> Alexander Fedulov | Solutions Architect
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> On Fri, May 22, 2020 at 3:57 PM Sara Arshad <sa...@gmail.com>
>>>> wrote:
>>>>
>>>>> The problem was that the broadcast puts a lot of pressure on
>>>>> checkpointing.
>>>>> I have to find another solution.
>>>>> If you have any other solution please let me know.
>>>>>
>>>>> Regards,
>>>>> Sara
>>>>>
>>>>> On Wed, 20 May 2020, 5:55 pm Sara Arshad, <sa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> That was the broadcast stream. Which is supposed to behave like a
>>>>>> cache.
>>>>>> Then I connect that one to the kinesis stream like the below code.
>>>>>> Also, I have two Sink functions to send the results to another
>>>>>> dynamoDb table or cloud watch based on the output type.
>>>>>> Is this make sense or do you have another idea?
>>>>>>
>>>>>> DataStreamSource<MyRule> ruleDataStreamSource = env.addSource(new DynamoDBSource()).setParallelism(1);
>>>>>>
>>>>>> MapStateDescriptor<String, MyRule> ruleStateDescriptor = new MapStateDescriptor<>(
>>>>>>         "RulesBroadcastState",
>>>>>>         BasicTypeInfo.STRING_TYPE_INFO,
>>>>>>         TypeInformation.of(new TypeHint<MyRule>() {
>>>>>>         }));
>>>>>>
>>>>>> BroadcastStream<MyRule> ruleBroadcastStream = ruleDataStreamSource
>>>>>>         .broadcast(ruleStateDescriptor);
>>>>>>
>>>>>> SingleOutputStreamOperator<WindowFunctionOutput> process = env.addSource(ObjectFactory.getKinesisConsumer())
>>>>>>         .keyBy(new KeySelector<Message, Tuple2<Identifier, Integer>>() {
>>>>>>
>>>>>>             @Override
>>>>>>             public Tuple2<Identifier, Integer> getKey(Message value) throws Exception {
>>>>>>                 return Tuple2.of(value.getIdentifier(), value.getServiceType());
>>>>>>             }
>>>>>>
>>>>>>         })
>>>>>>         .connect(ruleBroadcastStream)
>>>>>>         .process(new BroadcastWindowFunction());
>>>>>>
>>>>>> DataStreamSink<OutputX> blockingStrategyOutputDataStreamSink = process
>>>>>>         .filter(output -> OutputX.class.isAssignableFrom(output.getClass()))
>>>>>>         .map(output -> (OutputX) output)
>>>>>>         .addSink(new DynamoDBSink());
>>>>>>
>>>>>>
>>>>>> DataStreamSink<OutputY> metricsOutputDataStreamSink = process
>>>>>>         .filter(output -> OutputY.class.isAssignableFrom(output.getClass()))
>>>>>>         .map(output -> (OutputY) output)
>>>>>>         .addSink(new CloudWatchMetricsSink());
>>>>>>
>>>>>>
>>>>>> On Wed, May 20, 2020 at 5:45 PM Alexander Fedulov <
>>>>>> alexander@ververica.com> wrote:
>>>>>>
>>>>>>> I did not notice that you are actually running the KinesisAnalytics
>>>>>>> job, without access to the machines, sorry. In this case, without any
>>>>>>> errors in the logs, I think there is not much that we can do without the
>>>>>>> AWS support team looking into it.
>>>>>>> Nonetheless, one thing I was wondering about is whether you
>>>>>>> necessarily need to have a custom DynamoDBSource to fetch rules
>>>>>>> periodically. How about directly connecting to the steam of DynamoDB
>>>>>>> updates and getting everything in the real time [1]
>>>>>>> <https://issues.apache.org/jira/browse/FLINK-4582> ? This would
>>>>>>> remove one moving part that, as I see, you suspect to be a potential source
>>>>>>> of errors.
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-4582
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Alexander Fedulov | Solutions Architect
>>>>>>>
>>>>>>>
>>>>>>> <https://www.ververica.com/>
>>>>>>>
>>>>>>> Follow us @VervericaData
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>>> Conference
>>>>>>>
>>>>>>> On Wed, May 20, 2020 at 4:38 PM Sara Arshad <
>>>>>>> sara.arshad.86@gmail.com> wrote:
>>>>>>>
>>>>>>>> I am using KinesisAnalytics and cloud watch dashboard for logs.
>>>>>>>> There is no error level log.
>>>>>>>> It's doing stuff like 'Triggering checkpoint 187 @ 1589984558975
>>>>>>>> for job 7998aca8913b33a090bc5c0f43168bd5.' then suddenly it is
>>>>>>>> restarting.
>>>>>>>> I know these are very general but I really don't know what's going
>>>>>>>> on.
>>>>>>>> I also asked AWS support. They haven't replied yet.
>>>>>>>>
>>>>>>>> This is my broadcast stream source:
>>>>>>>>
>>>>>>>> public class DynamoDBSource extends
>>>>>>>> RichParallelSourceFunction<MyRule> implements CheckpointedFunction,
>>>>>>>> ProcessingTimeCallback {
>>>>>>>>
>>>>>>>>     private static final long serialVersionUID = 1L;
>>>>>>>>
>>>>>>>>     private volatile boolean isRunning = true;
>>>>>>>>
>>>>>>>>     ListStateDescriptor<MyRule> ruleStateDescriptor = new
>>>>>>>> ListStateDescriptor<>(
>>>>>>>>             "RulesBroadcastState",
>>>>>>>>             TypeInformation.of(new TypeHint<MyRule>() {
>>>>>>>>             }));
>>>>>>>>
>>>>>>>>     private volatile Boolean sendData = true;
>>>>>>>>     private transient ListState<MyRule> listState;
>>>>>>>>
>>>>>>>>     private transient ProcessingTimeServiceInf
>>>>>>>> processingTimeService;
>>>>>>>>
>>>>>>>>     private static long rulesUpdateIntervalMillis;
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void open(Configuration parameters) throws Exception {
>>>>>>>>
>>>>>>>>         //...
>>>>>>>>
>>>>>>>>         processingTimeService = new ProcessingTimeServiceImpl();
>>>>>>>>
>>>>>>>>         long currentProcessingTime =
>>>>>>>> processingTimeService.getCurrentProcessingTime();
>>>>>>>>
>>>>>>>>         rulesUpdateIntervalMillis = Some static value from config
>>>>>>>> class
>>>>>>>>
>>>>>>>>         processingTimeService.registerTimer(currentProcessingTime,
>>>>>>>> this);
>>>>>>>>
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void snapshotState(FunctionSnapshotContext context)
>>>>>>>> throws Exception {
>>>>>>>>
>>>>>>>>         Preconditions.checkState(this.listState != null,
>>>>>>>>                 "The " + getClass().getSimpleName() + " has not
>>>>>>>> been properly initialized.");
>>>>>>>>
>>>>>>>>         this.listState.clear();
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void initializeState(FunctionInitializationContext
>>>>>>>> context) throws Exception {
>>>>>>>>
>>>>>>>>         Preconditions.checkState(this.listState == null,
>>>>>>>>                 "The " + getClass().getSimpleName() + " has already
>>>>>>>> been initialized.");
>>>>>>>>
>>>>>>>>         this.listState =
>>>>>>>> context.getOperatorStateStore().getListState(ruleStateDescriptor);
>>>>>>>>
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void run(SourceContext<BlockingRule> ctx) throws
>>>>>>>> Exception {
>>>>>>>>
>>>>>>>>         while (isRunning) {
>>>>>>>>
>>>>>>>>             synchronized (sendData) {
>>>>>>>>
>>>>>>>>                 if (sendData) {
>>>>>>>>
>>>>>>>>                     for (MyRule rule : listState.get()) {
>>>>>>>>
>>>>>>>>                         ctx.collect(rule);
>>>>>>>>                     }
>>>>>>>>                     sendData = false;
>>>>>>>>                 }
>>>>>>>>             }
>>>>>>>>         }
>>>>>>>>
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void cancel() {
>>>>>>>>         this.isRunning = false;
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void onProcessingTime(long timestamp) throws Exception {
>>>>>>>>
>>>>>>>>         readRulesFromDB();
>>>>>>>>
>>>>>>>>
>>>>>>>> processingTimeService.registerTimer(processingTimeService.getCurrentProcessingTime()
>>>>>>>> + rulesUpdateIntervalMillis, this);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     private synchronized void readRulesFromDB() {
>>>>>>>>
>>>>>>>>
>>>>>>>>          ...
>>>>>>>>         this.sendData = true;
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 20, 2020 at 4:10 PM Alexander Fedulov <
>>>>>>>> alexander@ververica.com> wrote:
>>>>>>>>
>>>>>>>>> We'd need more details to localize the problem. What are the last
>>>>>>>>> things printed before the restart? Are there any actual error-level logs
>>>>>>>>> there? Do you happen to find any JVM crash files
>>>>>>>>> (hs_err_pidXXXX.log) on your Flink machines?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Alexander Fedulov | Solutions Architect
>>>>>>>>>
>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>
>>>>>>>>> Follow us @VervericaData
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>>>>> Conference
>>>>>>>>>
>>>>>>>>> On Wed, May 20, 2020 at 4:01 PM Sara Arshad <
>>>>>>>>> sara.arshad.86@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thank you for your response.
>>>>>>>>>> I get
>>>>>>>>>> Error when creating PropertyDescriptor for public final void
>>>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty...
>>>>>>>>>> quite a lot. But it's Info and also I had it before broadcast.
>>>>>>>>>> I also retried an older version of my application and it still
>>>>>>>>>> works fine.
>>>>>>>>>> By the way, the scenario works fine between restarts.
>>>>>>>>>>
>>>>>>>>>> Best regards,
>>>>>>>>>>
>>>>>>>>>> On Wed, May 20, 2020 at 3:44 PM Alexander Fedulov <
>>>>>>>>>> alexander@ververica.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Sara,
>>>>>>>>>>>
>>>>>>>>>>> do you have logs? Any exceptions in them?
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Alexander Fedulov | Solutions Architect
>>>>>>>>>>>
>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>
>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache
>>>>>>>>>>> Flink Conference
>>>>>>>>>>>
>>>>>>>>>>> On Tue, May 19, 2020 at 11:28 PM Sara Arshad <
>>>>>>>>>>> sara.arshad.86@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I have been using Flink with kinesis analytics.
>>>>>>>>>>>> I have a stream of data and also I need a cache which I update
>>>>>>>>>>>> every 300 seconds.
>>>>>>>>>>>> To share the cache data with the kinesis stream elements, I
>>>>>>>>>>>> used a broadcast stream as I implemented a SourceFunction which gets the
>>>>>>>>>>>> data from DB and broadcast it to the next stream which is
>>>>>>>>>>>> KeyedBroadcastProcessFuction.
>>>>>>>>>>>> But after adding the broadcast stream (in the previous version
>>>>>>>>>>>> I hadn't
>>>>>>>>>>>> a cache and I was using KeyedProcessFuction for kinesis
>>>>>>>>>>>> stream), when I execute it in kinesis analytics, it keeps restarting about
>>>>>>>>>>>> every 20 minutes.
>>>>>>>>>>>> Could you please help me that what could be the issue?
>>>>>>>>>>>>
>>>>>>>>>>>> Best regards,
>>>>>>>>>>>> Sara Arshad
>>>>>>>>>>>>
>>>>>>>>>>>

On Wed, May 20, 2020 at 3:44 PM Alexander Fedulov <al...@ververica.com>
wrote:

> Hi Sara,
>
> do you have logs? Any exceptions in them?
>
> Best,
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> +49 1514 6265796
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>
>
>
> On Tue, May 19, 2020 at 11:28 PM Sara Arshad <sa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have been using Flink with kinesis analytics.
>> I have a stream of data and also I need a cache which I update every 300
>> seconds.
>> To share the cache data with the kinesis stream elements, I used a
>> broadcast stream as I implemented a SourceFunction which gets the data from
>> DB and broadcast it to the next stream which is
>> KeyedBroadcastProcessFuction.
>> But after adding the broadcast stream (in the previous version I hadn't
>> a cache and I was using KeyedProcessFuction for kinesis stream), when I
>> execute it in kinesis analytics, it keeps restarting about every 20
>> minutes.
>> Could you please help me that what could be the issue?
>>
>> Best regards,
>> Sara Arshad
>>
>

Re: Question about My Flink Application

Posted by Alexander Fedulov <al...@ververica.com>.
Hi Sara,

do you have logs? Any exceptions in them?

Best,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng



On Tue, May 19, 2020 at 11:28 PM Sara Arshad <sa...@gmail.com>
wrote:

> Hi,
>
> I have been using Flink with kinesis analytics.
> I have a stream of data and also I need a cache which I update every 300
> seconds.
> To share the cache data with the kinesis stream elements, I used a
> broadcast stream as I implemented a SourceFunction which gets the data from
> DB and broadcast it to the next stream which is
> KeyedBroadcastProcessFuction.
> But after adding the broadcast stream (in the previous version I hadn't
> a cache and I was using KeyedProcessFuction for kinesis stream), when I
> execute it in kinesis analytics, it keeps restarting about every 20
> minutes.
> Could you please help me that what could be the issue?
>
> Best regards,
> Sara Arshad
>