You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Ilya Shishkov (Jira)" <ji...@apache.org> on 2021/12/20 18:26:00 UTC
[jira] [Updated] (IGNITE-16176) Make consumer request timeout configurable in KafkaToIgniteCdcStreamerApplier
[ https://issues.apache.org/jira/browse/IGNITE-16176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ilya Shishkov updated IGNITE-16176:
-----------------------------------
Description:
Now KafkaToIgniteCdcStreamerApplier performs requests with a hard-coded timeout {{DFLT_REQ_TIMEOUT}}:
{code:title=KafkaToIgniteCdcStreamerApplier#poll}
private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException {
ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
if (log.isDebugEnabled()) {
log.debug(
"Polled from consumer [assignments=" + cnsmr.assignment() + ",rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']'
);
}
apply(F.iterator(recs, this::deserialize, true, rec -> F.isEmpty(caches) || caches.contains(rec.key())));
cnsmr.commitSync(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
}
{code}
We should have configurable timeout for requests to the Kafka.
was:
Now KafkaToIgniteCdcStreamerApplier performs requests with a hard-coded timeout:
{code:title=KafkaToIgniteCdcStreamerApplier#poll}
private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException {
ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
if (log.isDebugEnabled()) {
log.debug(
"Polled from consumer [assignments=" + cnsmr.assignment() + ",rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']'
);
}
apply(F.iterator(recs, this::deserialize, true, rec -> F.isEmpty(caches) || caches.contains(rec.key())));
cnsmr.commitSync(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
}
{code}
We should have configurable timeout for requests to the Kafka.
> Make consumer request timeout configurable in KafkaToIgniteCdcStreamerApplier
> -----------------------------------------------------------------------------
>
> Key: IGNITE-16176
> URL: https://issues.apache.org/jira/browse/IGNITE-16176
> Project: Ignite
> Issue Type: Improvement
> Reporter: Ilya Shishkov
> Priority: Minor
> Labels: IEP-59, ise
>
> Now KafkaToIgniteCdcStreamerApplier performs requests with a hard-coded timeout {{DFLT_REQ_TIMEOUT}}:
> {code:title=KafkaToIgniteCdcStreamerApplier#poll}
> private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException {
> ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
> if (log.isDebugEnabled()) {
> log.debug(
> "Polled from consumer [assignments=" + cnsmr.assignment() + ",rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']'
> );
> }
> apply(F.iterator(recs, this::deserialize, true, rec -> F.isEmpty(caches) || caches.contains(rec.key())));
> cnsmr.commitSync(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
> }
> {code}
> We should have configurable timeout for requests to the Kafka.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)