You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by liujiangang <li...@gmail.com> on 2018/11/21 07:06:52 UTC

IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

I am using IntervalJoin function to join two streams within 10 minutes. As
below:

labelStream.intervalJoin(adLogStream)
           .between(Time.milliseconds(0), Time.milliseconds(600000))
           .process(new processFunction())
           .sink(kafkaProducer)
labelStream and adLogStream are proto-buf class that are keyed by Long id.

Our two input-streams are huge. After running about 30minutes, the output to
kafka go down slowly, like this:
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/1.png> 

When data output begins going down, I use jstack and pstack sevaral times to
get these: 
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/2.png> 
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/3.png> 

It seems the program is stucked in rockdb's seek. And I find that some
rockdb's srt file are accessed slowly by iteration.
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/4.png> 

I have tried several ways:

1)Reduce the input amount to half. This works well.
2)Replace labelStream and adLogStream with simple Strings. This way, data
amount will not change. This works well.
3)Use PredefinedOptions like SPINNING_DISK_OPTIMIZED and
SPINNING_DISK_OPTIMIZED_HIGH_MEM. This still fails.
4)Use new versions of rocksdbjni. This still fails.
Can anyone give me some suggestions? Thank you very much.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

Posted by liujiangang <li...@gmail.com>.
Yes, you are right. I add log to record the time of seek and find that
sometimes it is very slow. Then I use the rocksdb's files to test locally
and the same problem appears. It is very weird to find that rocksdb's seek
iterate data one by one. Until now, I add cache for rocksdb. The time is
faster than before but not solved completely. Added code is below:
                public ColumnFamilyOptions createColumnOptions() {
                        // return new ColumnFamilyOptions();
			BlockBasedTableConfig blockBasedTableConfig = new
BlockBasedTableConfig();
			blockBasedTableConfig.setBlockCacheSize(1024 * 1024 * 1024);
			ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions();
			columnFamilyOptions.setTableFormatConfig(blockBasedTableConfig);
			return columnFamilyOptions;
		}




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

Posted by Stefan Richter <s....@data-artisans.com>.
Btw how did you make sure that it is stuck in the seek call and that the trace does not show different invocations of seek? This can indicate that seek is slow, but is not yet proof that you are stuck.

> On 22. Nov 2018, at 13:01, liujiangang <li...@gmail.com> wrote:
> 
> This is not my case. Thank you.
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

Posted by liujiangang <li...@gmail.com>.
This is not my case. Thank you.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

are your RocksDB instances running on local SSDs or on something like EBS? If have previously seen cases where this happened because some EBS quota was exhausted and the performance got throttled.

Best,
Stefan

> On 22. Nov 2018, at 09:51, liujiangang <li...@gmail.com> wrote:
> 
> Thank you very much. I have something to say. Each data is 20KB. The
> parallelism is 500 and each taskmanager memory is 10G. The memory is enough,
> and I think the parallelism is big enough. Only the intervalJoin thread is
> beyond 100% because of rockdb's seek. I am confused that why rockdb's seek
> taks so long time but get no result. I don't kow how to debug rocksdb in
> flink.
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

Posted by liujiangang <li...@gmail.com>.
Thank you very much. I have something to say. Each data is 20KB. The
parallelism is 500 and each taskmanager memory is 10G. The memory is enough,
and I think the parallelism is big enough. Only the intervalJoin thread is
beyond 100% because of rockdb's seek. I am confused that why rockdb's seek
taks so long time but get no result. I don't kow how to debug rocksdb in
flink.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Jiangang,

The IntervalJoin is actually the DataStream-level implementation of the SQL time-windowed join[1]. 

To ensure the completeness of the join results, we have to cache all the records (from both sides) in the most recent time interval. That may lead to state backend problems when huge streams flooding in. 

One benefit of SQL is that the optimizer will help to reduce the join inputs as much as possible (e.g., via predicate pushdown), but that should be done manually in DataStream programs. Thus, I suggest you to 1) try increasing the parallelism (and number of nodes if possible); 2) filter out some records or reduce the number of fields in advance.

Best,
Xingcan

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins

> On Nov 21, 2018, at 2:06 AM, liujiangang <li...@gmail.com> wrote:
> 
> I am using IntervalJoin function to join two streams within 10 minutes. As
> below:
> 
> labelStream.intervalJoin(adLogStream)
>           .between(Time.milliseconds(0), Time.milliseconds(600000))
>           .process(new processFunction())
>           .sink(kafkaProducer)
> labelStream and adLogStream are proto-buf class that are keyed by Long id.
> 
> Our two input-streams are huge. After running about 30minutes, the output to
> kafka go down slowly, like this:
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/1.png> 
> 
> When data output begins going down, I use jstack and pstack sevaral times to
> get these: 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/2.png> 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/3.png> 
> 
> It seems the program is stucked in rockdb's seek. And I find that some
> rockdb's srt file are accessed slowly by iteration.
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/4.png> 
> 
> I have tried several ways:
> 
> 1)Reduce the input amount to half. This works well.
> 2)Replace labelStream and adLogStream with simple Strings. This way, data
> amount will not change. This works well.
> 3)Use PredefinedOptions like SPINNING_DISK_OPTIMIZED and
> SPINNING_DISK_OPTIMIZED_HIGH_MEM. This still fails.
> 4)Use new versions of rocksdbjni. This still fails.
> Can anyone give me some suggestions? Thank you very much.
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/