You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/03/01 12:08:41 UTC

[ignite-extensions] branch master updated: IGNITE-16176 Adds configurable Kafka requests timeout. (#99)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ed8662  IGNITE-16176 Adds configurable Kafka requests timeout. (#99)
2ed8662 is described below

commit 2ed86621333be0af69224f33e08feeca96934a46
Author: Mikhail Petrov <32...@users.noreply.github.com>
AuthorDate: Tue Mar 1 15:08:05 2022 +0300

    IGNITE-16176 Adds configurable Kafka requests timeout. (#99)
---
 ...VersionConflictResolverCachePluginProvider.java |   2 +-
 ...CacheVersionConflictResolverPluginProvider.java |   2 +-
 .../ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java | 162 +++++++++++++++------
 .../ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java |  25 +++-
 .../cdc/kafka/KafkaToIgniteCdcStreamerApplier.java |  17 ++-
 .../KafkaToIgniteCdcStreamerConfiguration.java     |  35 ++++-
 .../cdc/kafka/CdcKafkaReplicationAppsTest.java     |   9 +-
 .../ignite/cdc/kafka/CdcKafkaReplicationTest.java  |  17 ++-
 .../resources/loader/kafka-to-ignite-correct.xml   |   6 +-
 .../src/test/resources/loader/kafka.properties     |   2 +-
 .../test/resources/replication/ignite-to-kafka.xml |  15 +-
 .../test/resources/replication/kafka-to-ignite.xml |   1 +
 12 files changed, 214 insertions(+), 79 deletions(-)

diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverCachePluginProvider.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverCachePluginProvider.java
index e7030fc..bcae805 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverCachePluginProvider.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverCachePluginProvider.java
@@ -96,7 +96,7 @@ public class CacheVersionConflictResolverCachePluginProvider<K, V, C extends Cac
     }
 
     /** {@inheritDoc} */
-    @Nullable public <T, K2, V2> T unwrapCacheEntry(Cache.Entry<K2, V2> entry, Class<T> cls) {
+    @Nullable @Override public <T, K2, V2> T unwrapCacheEntry(Cache.Entry<K2, V2> entry, Class<T> cls) {
         return null;
     }
 }
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java
index 3f95b6f..5b9007f 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java
@@ -167,7 +167,7 @@ public class CacheVersionConflictResolverPluginProvider<C extends PluginConfigur
     }
 
     /** {@inheritDoc} */
-    @Nullable public <T> T createComponent(PluginContext ctx, Class<T> cls) {
+    @Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) {
         return null;
     }
 }
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index af96169..c1cbbeb 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.cdc.kafka;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
@@ -35,7 +36,9 @@ import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteExperimental;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -47,6 +50,8 @@ import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.EVTS_CNT;
 import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.EVTS_CNT_DESC;
 import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME;
 import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME_DESC;
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_MAX_BATCH_SIZE;
 import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
 
@@ -66,9 +71,10 @@ import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_
  * @see KafkaToIgniteCdcStreamer
  * @see CacheVersionConflictResolverImpl
  */
+@IgniteExperimental
 public class IgniteToKafkaCdcStreamer implements CdcConsumer {
-    /** Default kafka request timeout in seconds. */
-    public static final int DFLT_REQ_TIMEOUT = 5;
+    /** Default value for the flag that indicates whether entries only from primary nodes should be handled. */
+    public static final boolean DFLT_IS_ONLY_PRIMARY = false;
 
     /** Bytes sent metric name. */
     public static final String BYTES_SENT = "BytesSent";
@@ -84,22 +90,28 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
     private KafkaProducer<Integer, byte[]> producer;
 
     /** Handle only primary entry flag. */
-    private final boolean onlyPrimary;
+    private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;
 
     /** Topic name. */
-    private final String topic;
+    private String topic;
 
     /** Kafka topic partitions count. */
-    private final int kafkaParts;
+    private int kafkaParts;
 
     /** Kafka properties. */
-    private final Properties kafkaProps;
+    private Properties kafkaProps;
 
     /** Cache IDs. */
-    private final Set<Integer> cachesIds;
+    private Set<Integer> cachesIds;
+
+    /** Cache names. */
+    private Collection<String> cacheNames;
 
     /** Max batch size. */
-    private final int maxBatchSize;
+    private int maxBatchSize = DFLT_MAX_BATCH_SIZE;
+
+    /** The maximum time to complete Kafka related requests, in milliseconds. */
+    private long kafkaReqTimeout = DFLT_KAFKA_REQ_TIMEOUT;
 
     /** Timestamp of last sent message. */
     private AtomicLongMetric lastMsgTs;
@@ -110,39 +122,6 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
     /** Count of sent messages.  */
     private AtomicLongMetric msgsSnt;
 
-    /**
-     * @param topic Topic name.
-     * @param kafkaParts Kafka partitions count.
-     * @param caches Cache names.
-     * @param maxBatchSize Maximum size of records concurrently sent to Kafka.
-     * @param onlyPrimary If {@code true} then stream only events from primaries.
-     * @param kafkaProps Kafka properties.
-     */
-    public IgniteToKafkaCdcStreamer(
-        String topic,
-        int kafkaParts,
-        Set<String> caches,
-        int maxBatchSize,
-        boolean onlyPrimary,
-        Properties kafkaProps
-    ) {
-        assert caches != null && !caches.isEmpty();
-
-        this.topic = topic;
-        this.kafkaParts = kafkaParts;
-        this.onlyPrimary = onlyPrimary;
-        this.kafkaProps = kafkaProps;
-        this.maxBatchSize = maxBatchSize;
-
-        cachesIds = caches.stream()
-            .mapToInt(CU::cacheId)
-            .boxed()
-            .collect(Collectors.toSet());
-
-        kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
-        kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-    }
-
     /** {@inheritDoc} */
     @Override public boolean onEvents(Iterator<CdcEvent> evts) {
         List<Future<RecordMetadata>> futs = new ArrayList<>();
@@ -169,7 +148,7 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
                 continue;
             }
 
-            if (!cachesIds.isEmpty() && !cachesIds.contains(evt.cacheId())) {
+            if (!cachesIds.contains(evt.cacheId())) {
                 if (log.isDebugEnabled())
                     log.debug("Event skipped because of cacheId [evt=" + evt + ']');
 
@@ -194,7 +173,7 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
         if (!futs.isEmpty()) {
             try {
                 for (Future<RecordMetadata> fut : futs)
-                    fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);
+                    fut.get(kafkaReqTimeout, TimeUnit.MILLISECONDS);
 
                 msgsSnt.add(futs.size());
 
@@ -213,6 +192,19 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
 
     /** {@inheritDoc} */
     @Override public void start(MetricRegistry mreg) {
+        A.notNull(kafkaProps, "Kafka properties");
+        A.notNull(topic, "Kafka topic");
+        A.notEmpty(cacheNames, "caches");
+        A.ensure(kafkaParts > 0, "The number of Kafka partitions must be explicitly set to a value greater than zero.");
+        A.ensure(kafkaReqTimeout >= 0, "The Kafka request timeout cannot be negative.");
+
+        kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
+        kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+
+        cachesIds = cacheNames.stream()
+            .map(CU::cacheId)
+            .collect(Collectors.toSet());
+
         try {
             producer = new KafkaProducer<>(kafkaProps);
 
@@ -232,4 +224,88 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
     @Override public void stop() {
         producer.close();
     }
+
+    /**
+     * Sets whether entries only from primary nodes should be handled.
+     *
+     * @param onlyPrimary Kafka whether entries only from primary nodes should be handled.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToKafkaCdcStreamer setOnlyPrimary(boolean onlyPrimary) {
+        this.onlyPrimary = onlyPrimary;
+
+        return this;
+    }
+
+    /**
+     * Sets topic that is used to send data to Kafka.
+     *
+     * @param topic Kafka topic.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToKafkaCdcStreamer setTopic(String topic) {
+        this.topic = topic;
+
+        return this;
+    }
+
+    /**
+     * Sets number of Kafka partitions.
+     *
+     * @param kafkaParts Number of Kafka partitions.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToKafkaCdcStreamer setKafkaPartitions(int kafkaParts) {
+        this.kafkaParts = kafkaParts;
+
+        return this;
+    }
+
+    /**
+     * Sets cache names that participate in CDC.
+     *
+     * @param caches Cache names.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToKafkaCdcStreamer setCaches(Collection<String> caches) {
+        this.cacheNames = caches;
+
+        return this;
+    }
+
+    /**
+     * Sets maximum batch size.
+     *
+     * @param maxBatchSize Maximum batch size.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToKafkaCdcStreamer setMaxBatchSize(int maxBatchSize) {
+        this.maxBatchSize = maxBatchSize;
+
+        return this;
+    }
+
+    /**
+     * Sets properties that are used to initiate connection to Kafka.
+     *
+     * @param kafkaProps Properties that are used to initiate connection to Kafka.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToKafkaCdcStreamer setKafkaProperties(Properties kafkaProps) {
+        this.kafkaProps = kafkaProps;
+
+        return this;
+    }
+
+    /**
+     * Sets the maximum time to complete Kafka related requests, in milliseconds.
+     * 
+     * @param kafkaReqTimeout Timeout value.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToKafkaCdcStreamer setKafkaRequestTimeout(long kafkaReqTimeout) {
+        this.kafkaReqTimeout = kafkaReqTimeout;
+
+        return this;
+    }
 }
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
index e62a6f4..a0d3880 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
@@ -35,8 +35,10 @@ import org.apache.ignite.internal.GridLoggerProxy;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteExperimental;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -83,6 +85,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
  * @see KafkaToIgniteCdcStreamerApplier
  * @see CacheConflictResolutionManagerImpl
  */
+@IgniteExperimental
 public class KafkaToIgniteCdcStreamer implements Runnable {
     /** Ignite configuration. */
     private final IgniteConfiguration igniteCfg;
@@ -109,6 +112,22 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
         Properties kafkaProps,
         KafkaToIgniteCdcStreamerConfiguration streamerCfg
     ) {
+        A.notNull(streamerCfg.getTopic(), "Kafka topic");
+        A.ensure(
+            streamerCfg.getKafkaPartsFrom() >= 0,
+            "The Kafka partitions lower bound must be explicitly set to a value greater than or equals to zero.");
+        A.ensure(
+            streamerCfg.getKafkaPartsTo() > 0,
+            "The Kafka partitions upper bound must be explicitly set to a value greater than zero.");
+        A.ensure(
+            streamerCfg.getKafkaPartsTo() > streamerCfg.getKafkaPartsFrom(),
+            "The Kafka partitions upper bound must be greater than lower bound.");
+        A.ensure(streamerCfg.getKafkaRequestTimeout() >= 0, "The Kafka request timeout cannot be negative.");
+        A.ensure(streamerCfg.getThreadCount() > 0, "Threads count value must me greater than zero.");
+        A.ensure(
+            streamerCfg.getKafkaPartsTo() - streamerCfg.getKafkaPartsFrom() >= streamerCfg.getThreadCount(),
+            "Threads count must be less or equals to the total Kafka partitions count.");
+
         this.igniteCfg = igniteCfg;
         this.kafkaProps = kafkaProps;
         this.streamerCfg = streamerCfg;
@@ -158,9 +177,6 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
             int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
             int threadCnt = streamerCfg.getThreadCount();
 
-            assert kafkaParts >= threadCnt
-                : "Threads count bigger then kafka partitions count [kafkaParts=" + kafkaParts + ",threadCount=" + threadCnt + ']';
-
             int partPerApplier = kafkaParts / threadCnt;
 
             for (int i = 0; i < threadCnt; i++) {
@@ -179,7 +195,8 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
                     kafkaPartsFrom + to,
                     caches,
                     streamerCfg.getMaxBatchSize(),
-                    stopped
+                    stopped,
+                    streamerCfg.getKafkaRequestTimeout()
                 );
 
                 appliers.add(applier);
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
index fbac6e0..985deb7 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -81,9 +81,6 @@ import org.apache.kafka.common.errors.WakeupException;
  * @see CacheEntryVersion
  */
 class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnable, AutoCloseable {
-    /** */
-    public static final int DFLT_REQ_TIMEOUT = 3;
-
     /** Ignite instance. */
     private final IgniteEx ign;
 
@@ -114,6 +111,9 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
     /** */
     private final AtomicLong rcvdEvts = new AtomicLong();
 
+    /** The maximum time to complete Kafka related requests, in milliseconds. */
+    private final long kafkaReqTimeout;
+
     /**
      * @param ign Ignite instance.
      * @param log Logger.
@@ -124,6 +124,7 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
      * @param caches Cache ids.
      * @param maxBatchSize Maximum batch size.
      * @param stopped Stopped flag.
+     * @param kafkaReqTimeout The maximum time to complete Kafka related requests, in milliseconds.
      */
     public KafkaToIgniteCdcStreamerApplier(
         IgniteEx ign,
@@ -134,7 +135,8 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
         int kafkaPartTo,
         Set<Integer> caches,
         int maxBatchSize,
-        AtomicBoolean stopped
+        AtomicBoolean stopped,
+        long kafkaReqTimeout
     ) {
         super(maxBatchSize);
 
@@ -146,6 +148,7 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
         this.caches = caches;
         this.stopped = stopped;
         this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
+        this.kafkaReqTimeout = kafkaReqTimeout;
     }
 
     /** {@inheritDoc} */
@@ -182,7 +185,7 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
         finally {
             for (KafkaConsumer<Integer, byte[]> consumer : cnsmrs) {
                 try {
-                    consumer.close(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
+                    consumer.close(Duration.ofMillis(kafkaReqTimeout));
                 }
                 catch (Exception e) {
                     log.warning("Close error!", e);
@@ -201,7 +204,7 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
      * @param cnsmr Data consumer.
      */
     private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException {
-        ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
+        ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
 
         if (log.isDebugEnabled()) {
             log.debug(
@@ -211,7 +214,7 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
 
         apply(F.iterator(recs, this::deserialize, true, rec -> F.isEmpty(caches) || caches.contains(rec.key())));
 
-        cnsmr.commitSync(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
+        cnsmr.commitSync(Duration.ofMillis(kafkaReqTimeout));
     }
 
     /**
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
index 1e9026c..2a14609 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cdc.kafka;
 import java.util.Collection;
 import java.util.Map;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.lang.IgniteExperimental;
 
 /**
  * Configuration of {@link KafkaToIgniteCdcStreamer} application.
@@ -27,27 +28,31 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
  * @see KafkaToIgniteCdcStreamer
  * @see KafkaToIgniteLoader
  */
+@IgniteExperimental
 public class KafkaToIgniteCdcStreamerConfiguration {
-    /** Default {@link #kafkaPartsTo} value. */
-    public static final int DFLT_PARTS = 16;
+    /** Default maximum time to complete Kafka related requests, in milliseconds. */
+    public static final long DFLT_KAFKA_REQ_TIMEOUT = 3_000L;
 
-    /** Default {@link #topic} value. */
-    public static final String DFLT_TOPIC = "ignite";
+    /** Default {@link #threadCnt} value. */
+    public static final int DFLT_THREAD_CNT = 16;
 
     /** Default {@link #maxBatchSize} value. */
     public static final int DFLT_MAX_BATCH_SIZE = 1024;
 
     /** {@link KafkaToIgniteCdcStreamerApplier} thread count. */
-    private int threadCnt = DFLT_PARTS;
+    private int threadCnt = DFLT_THREAD_CNT;
 
     /** Topic name. */
-    private String topic = DFLT_TOPIC;
+    private String topic;
 
     /** Kafka partitions lower bound (inclusive). */
-    private int kafkaPartsFrom = 0;
+    private int kafkaPartsFrom = -1;
 
     /** Kafka partitions higher bound (exclusive). */
-    private int kafkaPartsTo = DFLT_PARTS;
+    private int kafkaPartsTo;
+
+    /** The maximum time to complete Kafka related requests, in milliseconds. */
+    private long kafkaReqTimeout = DFLT_KAFKA_REQ_TIMEOUT;
 
     /**
      * Maximum batch size to apply to Ignite.
@@ -121,4 +126,18 @@ public class KafkaToIgniteCdcStreamerConfiguration {
     public void setCaches(Collection<String> caches) {
         this.caches = caches;
     }
+
+    /** @return The maximum time to complete Kafka related requests, in milliseconds. */
+    public long getKafkaRequestTimeout() {
+        return kafkaReqTimeout;
+    }
+
+    /**
+     * Sets the maximum time to complete Kafka related requests, in milliseconds.
+     *
+     * @param kafkaReqTimeout Timeout value.
+     */
+    public void setKafkaRequestTimeout(long kafkaReqTimeout) {
+        this.kafkaReqTimeout = kafkaReqTimeout;
+    }
 }
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
index bf51d4f..847cd9e 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
@@ -28,7 +28,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
 
-import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_PARTS;
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
 import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT_RANGE;
 import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
@@ -69,6 +69,9 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
     public static final String MAX_BATCH_SIZE = "MAX_BATCH_SIZE";
 
     /** */
+    public static final String KAFKA_REQ_TIMEOUT = "KAFKA_REQ_TIMEOUT";
+
+    /** */
     public static final String PROPS_PATH = "PROPS_PATH";
 
     /** */
@@ -102,6 +105,7 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
         params.put(PARTS, Integer.toString(DFLT_PARTS));
         params.put(MAX_BATCH_SIZE, Integer.toString(KEYS_CNT));
         params.put(PROPS_PATH, kafkaPropsPath);
+        params.put(KAFKA_REQ_TIMEOUT, Long.toString(DFLT_KAFKA_REQ_TIMEOUT));
 
         return runAsync(
             () -> CdcCommandLineStartup.main(new String[] {prepareConfig("/replication/ignite-to-kafka.xml", params)})
@@ -129,6 +133,7 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
         params.put(PARTS_FROM, Integer.toString(partFrom));
         params.put(PARTS_TO, Integer.toString(partTo));
         params.put(THREAD_CNT, Integer.toString((partTo - partFrom) / 3));
+        params.put(KAFKA_REQ_TIMEOUT, Long.toString(DFLT_KAFKA_REQ_TIMEOUT));
 
         return runAsync(
             () -> KafkaToIgniteCommandLineStartup.main(new String[] {prepareConfig("/replication/kafka-to-ignite.xml", params)})
@@ -166,4 +171,4 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
             throw new RuntimeException(e);
         }
     }
-}
\ No newline at end of file
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index 8f1bb5c..481ceb6 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -33,7 +33,7 @@ import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 
-import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_PARTS;
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
@@ -48,6 +48,9 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
     public static final String DEST_SRC_TOPIC = "dest-source";
 
     /** */
+    public static final int DFLT_PARTS = 16;
+
+    /** */
     private static EmbeddedKafkaCluster KAFKA = null;
 
     /** {@inheritDoc} */
@@ -106,7 +109,6 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
             ));
         }
 
-
         return futs;
     }
 
@@ -141,8 +143,14 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
      */
     protected IgniteInternalFuture<?> igniteToKafka(IgniteConfiguration igniteCfg, String topic, String cache) {
         return runAsync(() -> {
-            IgniteToKafkaCdcStreamer cdcCnsmr =
-                new IgniteToKafkaCdcStreamer(topic, DFLT_PARTS, Collections.singleton(cache), KEYS_CNT, false, kafkaProperties());
+            IgniteToKafkaCdcStreamer cdcCnsmr = new IgniteToKafkaCdcStreamer()
+                .setTopic(topic)
+                .setKafkaPartitions(DFLT_PARTS)
+                .setCaches(Collections.singleton(cache))
+                .setMaxBatchSize(KEYS_CNT)
+                .setOnlyPrimary(false)
+                .setKafkaProperties(kafkaProperties())
+                .setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
 
             CdcConfiguration cdcCfg = new CdcConfiguration();
 
@@ -177,6 +185,7 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
 
         cfg.setCaches(Collections.singletonList(cacheName));
         cfg.setTopic(topic);
+        cfg.setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
 
         return runAsync(new KafkaToIgniteCdcStreamer(igniteCfg, kafkaProperties(), cfg));
     }
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
index d8a2efa..ce56a5d 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
@@ -36,7 +36,11 @@
         </property>
     </bean>
 
-    <bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration" />
+    <bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="topic" value="ignite" />
+        <property name="kafkaPartsFrom" value="0" />
+        <property name="kafkaPartsTo" value="16" />
+    </bean>
 
     <util:properties id="kafkaProperties" location="loader/kafka.properties" />
 </beans>
\ No newline at end of file
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka.properties b/modules/cdc-ext/src/test/resources/loader/kafka.properties
index 58a46b9..cc090af 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka.properties
+++ b/modules/cdc-ext/src/test/resources/loader/kafka.properties
@@ -1,4 +1,4 @@
 bootstrap.servers=127.0.0.1
 key.serializer=ru.SomeClass
 value.serializer=ru.SomeOtherClass
-group.id=my-group
\ No newline at end of file
+group.id=my-group
diff --git a/modules/cdc-ext/src/test/resources/replication/ignite-to-kafka.xml b/modules/cdc-ext/src/test/resources/replication/ignite-to-kafka.xml
index 964daf0..4fcf726 100644
--- a/modules/cdc-ext/src/test/resources/replication/ignite-to-kafka.xml
+++ b/modules/cdc-ext/src/test/resources/replication/ignite-to-kafka.xml
@@ -44,18 +44,19 @@
     <bean id="cdc.cfg" class="org.apache.ignite.cdc.CdcConfiguration">
         <property name="consumer">
             <bean class="org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer">
-                <constructor-arg name="topic" value="{TOPIC}" />
-                <constructor-arg name="kafkaParts" value="{PARTS}" />
-                <constructor-arg name="caches">
+                <property name="topic" value="{TOPIC}" />
+                <property name="kafkaPartitions" value="{PARTS}" />
+                <property name="caches">
                     <util:list>
                         <bean class="java.lang.String">
                             <constructor-arg type="String" value="{REPLICATED_CACHE}" />
                         </bean>
                     </util:list>
-                </constructor-arg>
-                <constructor-arg name="maxBatchSize" value="{MAX_BATCH_SIZE}" />
-                <constructor-arg name="onlyPrimary" value="false" />
-                <constructor-arg name="kafkaProps" ref="kafkaProperties" />
+                </property>
+                <property name="maxBatchSize" value="{MAX_BATCH_SIZE}" />
+                <property name="onlyPrimary" value="false" />
+                <property name="kafkaProperties" ref="kafkaProperties" />
+                <property name="kafkaRequestTimeout" value="{KAFKA_REQ_TIMEOUT}" />
             </bean>
         </property>
     </bean>
diff --git a/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite.xml b/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite.xml
index 154b1b3..8bb35a4 100644
--- a/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite.xml
+++ b/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite.xml
@@ -55,6 +55,7 @@
         <property name="kafkaPartsFrom" value="{PARTS_FROM}"/>
         <property name="kafkaPartsTo" value="{PARTS_TO}"/>
         <property name="threadCount" value="{THREAD_CNT}"/>
+        <property name="kafkaRequestTimeout" value="{KAFKA_REQ_TIMEOUT}"/>
     </bean>
 
     <util:properties id="kafkaProperties" location="{PROPS_PATH}" />