You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/03/01 12:08:41 UTC
[ignite-extensions] branch master updated: IGNITE-16176 Adds configurable Kafka requests timeout. (#99)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 2ed8662 IGNITE-16176 Adds configurable Kafka requests timeout. (#99)
2ed8662 is described below
commit 2ed86621333be0af69224f33e08feeca96934a46
Author: Mikhail Petrov <32...@users.noreply.github.com>
AuthorDate: Tue Mar 1 15:08:05 2022 +0300
IGNITE-16176 Adds configurable Kafka requests timeout. (#99)
---
...VersionConflictResolverCachePluginProvider.java | 2 +-
...CacheVersionConflictResolverPluginProvider.java | 2 +-
.../ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java | 162 +++++++++++++++------
.../ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java | 25 +++-
.../cdc/kafka/KafkaToIgniteCdcStreamerApplier.java | 17 ++-
.../KafkaToIgniteCdcStreamerConfiguration.java | 35 ++++-
.../cdc/kafka/CdcKafkaReplicationAppsTest.java | 9 +-
.../ignite/cdc/kafka/CdcKafkaReplicationTest.java | 17 ++-
.../resources/loader/kafka-to-ignite-correct.xml | 6 +-
.../src/test/resources/loader/kafka.properties | 2 +-
.../test/resources/replication/ignite-to-kafka.xml | 15 +-
.../test/resources/replication/kafka-to-ignite.xml | 1 +
12 files changed, 214 insertions(+), 79 deletions(-)
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverCachePluginProvider.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverCachePluginProvider.java
index e7030fc..bcae805 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverCachePluginProvider.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverCachePluginProvider.java
@@ -96,7 +96,7 @@ public class CacheVersionConflictResolverCachePluginProvider<K, V, C extends Cac
}
/** {@inheritDoc} */
- @Nullable public <T, K2, V2> T unwrapCacheEntry(Cache.Entry<K2, V2> entry, Class<T> cls) {
+ @Nullable @Override public <T, K2, V2> T unwrapCacheEntry(Cache.Entry<K2, V2> entry, Class<T> cls) {
return null;
}
}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java
index 3f95b6f..5b9007f 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java
@@ -167,7 +167,7 @@ public class CacheVersionConflictResolverPluginProvider<C extends PluginConfigur
}
/** {@inheritDoc} */
- @Nullable public <T> T createComponent(PluginContext ctx, Class<T> cls) {
+ @Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) {
return null;
}
}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index af96169..c1cbbeb 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -18,6 +18,7 @@
package org.apache.ignite.cdc.kafka;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
@@ -35,7 +36,9 @@ import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.resources.LoggerResource;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -47,6 +50,8 @@ import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.EVTS_CNT;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.EVTS_CNT_DESC;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME_DESC;
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_MAX_BATCH_SIZE;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
@@ -66,9 +71,10 @@ import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_
* @see KafkaToIgniteCdcStreamer
* @see CacheVersionConflictResolverImpl
*/
+@IgniteExperimental
public class IgniteToKafkaCdcStreamer implements CdcConsumer {
- /** Default kafka request timeout in seconds. */
- public static final int DFLT_REQ_TIMEOUT = 5;
+ /** Default value for the flag that indicates whether entries only from primary nodes should be handled. */
+ public static final boolean DFLT_IS_ONLY_PRIMARY = false;
/** Bytes sent metric name. */
public static final String BYTES_SENT = "BytesSent";
@@ -84,22 +90,28 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
private KafkaProducer<Integer, byte[]> producer;
/** Handle only primary entry flag. */
- private final boolean onlyPrimary;
+ private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;
/** Topic name. */
- private final String topic;
+ private String topic;
/** Kafka topic partitions count. */
- private final int kafkaParts;
+ private int kafkaParts;
/** Kafka properties. */
- private final Properties kafkaProps;
+ private Properties kafkaProps;
/** Cache IDs. */
- private final Set<Integer> cachesIds;
+ private Set<Integer> cachesIds;
+
+ /** Cache names. */
+ private Collection<String> cacheNames;
/** Max batch size. */
- private final int maxBatchSize;
+ private int maxBatchSize = DFLT_MAX_BATCH_SIZE;
+
+ /** The maximum time to complete Kafka related requests, in milliseconds. */
+ private long kafkaReqTimeout = DFLT_KAFKA_REQ_TIMEOUT;
/** Timestamp of last sent message. */
private AtomicLongMetric lastMsgTs;
@@ -110,39 +122,6 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
/** Count of sent messages. */
private AtomicLongMetric msgsSnt;
- /**
- * @param topic Topic name.
- * @param kafkaParts Kafka partitions count.
- * @param caches Cache names.
- * @param maxBatchSize Maximum size of records concurrently sent to Kafka.
- * @param onlyPrimary If {@code true} then stream only events from primaries.
- * @param kafkaProps Kafka properties.
- */
- public IgniteToKafkaCdcStreamer(
- String topic,
- int kafkaParts,
- Set<String> caches,
- int maxBatchSize,
- boolean onlyPrimary,
- Properties kafkaProps
- ) {
- assert caches != null && !caches.isEmpty();
-
- this.topic = topic;
- this.kafkaParts = kafkaParts;
- this.onlyPrimary = onlyPrimary;
- this.kafkaProps = kafkaProps;
- this.maxBatchSize = maxBatchSize;
-
- cachesIds = caches.stream()
- .mapToInt(CU::cacheId)
- .boxed()
- .collect(Collectors.toSet());
-
- kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
- kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
- }
-
/** {@inheritDoc} */
@Override public boolean onEvents(Iterator<CdcEvent> evts) {
List<Future<RecordMetadata>> futs = new ArrayList<>();
@@ -169,7 +148,7 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
continue;
}
- if (!cachesIds.isEmpty() && !cachesIds.contains(evt.cacheId())) {
+ if (!cachesIds.contains(evt.cacheId())) {
if (log.isDebugEnabled())
log.debug("Event skipped because of cacheId [evt=" + evt + ']');
@@ -194,7 +173,7 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
if (!futs.isEmpty()) {
try {
for (Future<RecordMetadata> fut : futs)
- fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);
+ fut.get(kafkaReqTimeout, TimeUnit.MILLISECONDS);
msgsSnt.add(futs.size());
@@ -213,6 +192,19 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
+ A.notNull(kafkaProps, "Kafka properties");
+ A.notNull(topic, "Kafka topic");
+ A.notEmpty(cacheNames, "caches");
+ A.ensure(kafkaParts > 0, "The number of Kafka partitions must be explicitly set to a value greater than zero.");
+ A.ensure(kafkaReqTimeout >= 0, "The Kafka request timeout cannot be negative.");
+
+ kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
+ kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+
+ cachesIds = cacheNames.stream()
+ .map(CU::cacheId)
+ .collect(Collectors.toSet());
+
try {
producer = new KafkaProducer<>(kafkaProps);
@@ -232,4 +224,88 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
@Override public void stop() {
producer.close();
}
+
+ /**
+ * Sets whether entries only from primary nodes should be handled.
+ *
+ * @param onlyPrimary Kafka whether entries only from primary nodes should be handled.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToKafkaCdcStreamer setOnlyPrimary(boolean onlyPrimary) {
+ this.onlyPrimary = onlyPrimary;
+
+ return this;
+ }
+
+ /**
+ * Sets topic that is used to send data to Kafka.
+ *
+ * @param topic Kafka topic.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToKafkaCdcStreamer setTopic(String topic) {
+ this.topic = topic;
+
+ return this;
+ }
+
+ /**
+ * Sets number of Kafka partitions.
+ *
+ * @param kafkaParts Number of Kafka partitions.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToKafkaCdcStreamer setKafkaPartitions(int kafkaParts) {
+ this.kafkaParts = kafkaParts;
+
+ return this;
+ }
+
+ /**
+ * Sets cache names that participate in CDC.
+ *
+ * @param caches Cache names.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToKafkaCdcStreamer setCaches(Collection<String> caches) {
+ this.cacheNames = caches;
+
+ return this;
+ }
+
+ /**
+ * Sets maximum batch size.
+ *
+ * @param maxBatchSize Maximum batch size.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToKafkaCdcStreamer setMaxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+
+ return this;
+ }
+
+ /**
+ * Sets properties that are used to initiate connection to Kafka.
+ *
+ * @param kafkaProps Properties that are used to initiate connection to Kafka.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToKafkaCdcStreamer setKafkaProperties(Properties kafkaProps) {
+ this.kafkaProps = kafkaProps;
+
+ return this;
+ }
+
+ /**
+ * Sets the maximum time to complete Kafka related requests, in milliseconds.
+ *
+ * @param kafkaReqTimeout Timeout value.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToKafkaCdcStreamer setKafkaRequestTimeout(long kafkaReqTimeout) {
+ this.kafkaReqTimeout = kafkaReqTimeout;
+
+ return this;
+ }
}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
index e62a6f4..a0d3880 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
@@ -35,8 +35,10 @@ import org.apache.ignite.internal.GridLoggerProxy;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteExperimental;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -83,6 +85,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
* @see KafkaToIgniteCdcStreamerApplier
* @see CacheConflictResolutionManagerImpl
*/
+@IgniteExperimental
public class KafkaToIgniteCdcStreamer implements Runnable {
/** Ignite configuration. */
private final IgniteConfiguration igniteCfg;
@@ -109,6 +112,22 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
Properties kafkaProps,
KafkaToIgniteCdcStreamerConfiguration streamerCfg
) {
+ A.notNull(streamerCfg.getTopic(), "Kafka topic");
+ A.ensure(
+ streamerCfg.getKafkaPartsFrom() >= 0,
+ "The Kafka partitions lower bound must be explicitly set to a value greater than or equals to zero.");
+ A.ensure(
+ streamerCfg.getKafkaPartsTo() > 0,
+ "The Kafka partitions upper bound must be explicitly set to a value greater than zero.");
+ A.ensure(
+ streamerCfg.getKafkaPartsTo() > streamerCfg.getKafkaPartsFrom(),
+ "The Kafka partitions upper bound must be greater than lower bound.");
+ A.ensure(streamerCfg.getKafkaRequestTimeout() >= 0, "The Kafka request timeout cannot be negative.");
+ A.ensure(streamerCfg.getThreadCount() > 0, "Threads count value must me greater than zero.");
+ A.ensure(
+ streamerCfg.getKafkaPartsTo() - streamerCfg.getKafkaPartsFrom() >= streamerCfg.getThreadCount(),
+ "Threads count must be less or equals to the total Kafka partitions count.");
+
this.igniteCfg = igniteCfg;
this.kafkaProps = kafkaProps;
this.streamerCfg = streamerCfg;
@@ -158,9 +177,6 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
int threadCnt = streamerCfg.getThreadCount();
- assert kafkaParts >= threadCnt
- : "Threads count bigger then kafka partitions count [kafkaParts=" + kafkaParts + ",threadCount=" + threadCnt + ']';
-
int partPerApplier = kafkaParts / threadCnt;
for (int i = 0; i < threadCnt; i++) {
@@ -179,7 +195,8 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
kafkaPartsFrom + to,
caches,
streamerCfg.getMaxBatchSize(),
- stopped
+ stopped,
+ streamerCfg.getKafkaRequestTimeout()
);
appliers.add(applier);
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
index fbac6e0..985deb7 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -81,9 +81,6 @@ import org.apache.kafka.common.errors.WakeupException;
* @see CacheEntryVersion
*/
class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnable, AutoCloseable {
- /** */
- public static final int DFLT_REQ_TIMEOUT = 3;
-
/** Ignite instance. */
private final IgniteEx ign;
@@ -114,6 +111,9 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
/** */
private final AtomicLong rcvdEvts = new AtomicLong();
+ /** The maximum time to complete Kafka related requests, in milliseconds. */
+ private final long kafkaReqTimeout;
+
/**
* @param ign Ignite instance.
* @param log Logger.
@@ -124,6 +124,7 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
* @param caches Cache ids.
* @param maxBatchSize Maximum batch size.
* @param stopped Stopped flag.
+ * @param kafkaReqTimeout The maximum time to complete Kafka related requests, in milliseconds.
*/
public KafkaToIgniteCdcStreamerApplier(
IgniteEx ign,
@@ -134,7 +135,8 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
int kafkaPartTo,
Set<Integer> caches,
int maxBatchSize,
- AtomicBoolean stopped
+ AtomicBoolean stopped,
+ long kafkaReqTimeout
) {
super(maxBatchSize);
@@ -146,6 +148,7 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
this.caches = caches;
this.stopped = stopped;
this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
+ this.kafkaReqTimeout = kafkaReqTimeout;
}
/** {@inheritDoc} */
@@ -182,7 +185,7 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
finally {
for (KafkaConsumer<Integer, byte[]> consumer : cnsmrs) {
try {
- consumer.close(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
+ consumer.close(Duration.ofMillis(kafkaReqTimeout));
}
catch (Exception e) {
log.warning("Close error!", e);
@@ -201,7 +204,7 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
* @param cnsmr Data consumer.
*/
private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException {
- ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
+ ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
if (log.isDebugEnabled()) {
log.debug(
@@ -211,7 +214,7 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
apply(F.iterator(recs, this::deserialize, true, rec -> F.isEmpty(caches) || caches.contains(rec.key())));
- cnsmr.commitSync(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
+ cnsmr.commitSync(Duration.ofMillis(kafkaReqTimeout));
}
/**
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
index 1e9026c..2a14609 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cdc.kafka;
import java.util.Collection;
import java.util.Map;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.lang.IgniteExperimental;
/**
* Configuration of {@link KafkaToIgniteCdcStreamer} application.
@@ -27,27 +28,31 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
* @see KafkaToIgniteCdcStreamer
* @see KafkaToIgniteLoader
*/
+@IgniteExperimental
public class KafkaToIgniteCdcStreamerConfiguration {
- /** Default {@link #kafkaPartsTo} value. */
- public static final int DFLT_PARTS = 16;
+ /** Default maximum time to complete Kafka related requests, in milliseconds. */
+ public static final long DFLT_KAFKA_REQ_TIMEOUT = 3_000L;
- /** Default {@link #topic} value. */
- public static final String DFLT_TOPIC = "ignite";
+ /** Default {@link #threadCnt} value. */
+ public static final int DFLT_THREAD_CNT = 16;
/** Default {@link #maxBatchSize} value. */
public static final int DFLT_MAX_BATCH_SIZE = 1024;
/** {@link KafkaToIgniteCdcStreamerApplier} thread count. */
- private int threadCnt = DFLT_PARTS;
+ private int threadCnt = DFLT_THREAD_CNT;
/** Topic name. */
- private String topic = DFLT_TOPIC;
+ private String topic;
/** Kafka partitions lower bound (inclusive). */
- private int kafkaPartsFrom = 0;
+ private int kafkaPartsFrom = -1;
/** Kafka partitions higher bound (exclusive). */
- private int kafkaPartsTo = DFLT_PARTS;
+ private int kafkaPartsTo;
+
+ /** The maximum time to complete Kafka related requests, in milliseconds. */
+ private long kafkaReqTimeout = DFLT_KAFKA_REQ_TIMEOUT;
/**
* Maximum batch size to apply to Ignite.
@@ -121,4 +126,18 @@ public class KafkaToIgniteCdcStreamerConfiguration {
public void setCaches(Collection<String> caches) {
this.caches = caches;
}
+
+ /** @return The maximum time to complete Kafka related requests, in milliseconds. */
+ public long getKafkaRequestTimeout() {
+ return kafkaReqTimeout;
+ }
+
+ /**
+ * Sets the maximum time to complete Kafka related requests, in milliseconds.
+ *
+ * @param kafkaReqTimeout Timeout value.
+ */
+ public void setKafkaRequestTimeout(long kafkaReqTimeout) {
+ this.kafkaReqTimeout = kafkaReqTimeout;
+ }
}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
index bf51d4f..847cd9e 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
@@ -28,7 +28,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
-import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_PARTS;
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT_RANGE;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
@@ -69,6 +69,9 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
public static final String MAX_BATCH_SIZE = "MAX_BATCH_SIZE";
/** */
+ public static final String KAFKA_REQ_TIMEOUT = "KAFKA_REQ_TIMEOUT";
+
+ /** */
public static final String PROPS_PATH = "PROPS_PATH";
/** */
@@ -102,6 +105,7 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
params.put(PARTS, Integer.toString(DFLT_PARTS));
params.put(MAX_BATCH_SIZE, Integer.toString(KEYS_CNT));
params.put(PROPS_PATH, kafkaPropsPath);
+ params.put(KAFKA_REQ_TIMEOUT, Long.toString(DFLT_KAFKA_REQ_TIMEOUT));
return runAsync(
() -> CdcCommandLineStartup.main(new String[] {prepareConfig("/replication/ignite-to-kafka.xml", params)})
@@ -129,6 +133,7 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
params.put(PARTS_FROM, Integer.toString(partFrom));
params.put(PARTS_TO, Integer.toString(partTo));
params.put(THREAD_CNT, Integer.toString((partTo - partFrom) / 3));
+ params.put(KAFKA_REQ_TIMEOUT, Long.toString(DFLT_KAFKA_REQ_TIMEOUT));
return runAsync(
() -> KafkaToIgniteCommandLineStartup.main(new String[] {prepareConfig("/replication/kafka-to-ignite.xml", params)})
@@ -166,4 +171,4 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
throw new RuntimeException(e);
}
}
-}
\ No newline at end of file
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index 8f1bb5c..481ceb6 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -33,7 +33,7 @@ import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_PARTS;
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -48,6 +48,9 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
public static final String DEST_SRC_TOPIC = "dest-source";
/** */
+ public static final int DFLT_PARTS = 16;
+
+ /** */
private static EmbeddedKafkaCluster KAFKA = null;
/** {@inheritDoc} */
@@ -106,7 +109,6 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
));
}
-
return futs;
}
@@ -141,8 +143,14 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
*/
protected IgniteInternalFuture<?> igniteToKafka(IgniteConfiguration igniteCfg, String topic, String cache) {
return runAsync(() -> {
- IgniteToKafkaCdcStreamer cdcCnsmr =
- new IgniteToKafkaCdcStreamer(topic, DFLT_PARTS, Collections.singleton(cache), KEYS_CNT, false, kafkaProperties());
+ IgniteToKafkaCdcStreamer cdcCnsmr = new IgniteToKafkaCdcStreamer()
+ .setTopic(topic)
+ .setKafkaPartitions(DFLT_PARTS)
+ .setCaches(Collections.singleton(cache))
+ .setMaxBatchSize(KEYS_CNT)
+ .setOnlyPrimary(false)
+ .setKafkaProperties(kafkaProperties())
+ .setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
CdcConfiguration cdcCfg = new CdcConfiguration();
@@ -177,6 +185,7 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
cfg.setCaches(Collections.singletonList(cacheName));
cfg.setTopic(topic);
+ cfg.setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
return runAsync(new KafkaToIgniteCdcStreamer(igniteCfg, kafkaProperties(), cfg));
}
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
index d8a2efa..ce56a5d 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
@@ -36,7 +36,11 @@
</property>
</bean>
- <bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration" />
+ <bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="ignite" />
+ <property name="kafkaPartsFrom" value="0" />
+ <property name="kafkaPartsTo" value="16" />
+ </bean>
<util:properties id="kafkaProperties" location="loader/kafka.properties" />
</beans>
\ No newline at end of file
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka.properties b/modules/cdc-ext/src/test/resources/loader/kafka.properties
index 58a46b9..cc090af 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka.properties
+++ b/modules/cdc-ext/src/test/resources/loader/kafka.properties
@@ -1,4 +1,4 @@
bootstrap.servers=127.0.0.1
key.serializer=ru.SomeClass
value.serializer=ru.SomeOtherClass
-group.id=my-group
\ No newline at end of file
+group.id=my-group
diff --git a/modules/cdc-ext/src/test/resources/replication/ignite-to-kafka.xml b/modules/cdc-ext/src/test/resources/replication/ignite-to-kafka.xml
index 964daf0..4fcf726 100644
--- a/modules/cdc-ext/src/test/resources/replication/ignite-to-kafka.xml
+++ b/modules/cdc-ext/src/test/resources/replication/ignite-to-kafka.xml
@@ -44,18 +44,19 @@
<bean id="cdc.cfg" class="org.apache.ignite.cdc.CdcConfiguration">
<property name="consumer">
<bean class="org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer">
- <constructor-arg name="topic" value="{TOPIC}" />
- <constructor-arg name="kafkaParts" value="{PARTS}" />
- <constructor-arg name="caches">
+ <property name="topic" value="{TOPIC}" />
+ <property name="kafkaPartitions" value="{PARTS}" />
+ <property name="caches">
<util:list>
<bean class="java.lang.String">
<constructor-arg type="String" value="{REPLICATED_CACHE}" />
</bean>
</util:list>
- </constructor-arg>
- <constructor-arg name="maxBatchSize" value="{MAX_BATCH_SIZE}" />
- <constructor-arg name="onlyPrimary" value="false" />
- <constructor-arg name="kafkaProps" ref="kafkaProperties" />
+ </property>
+ <property name="maxBatchSize" value="{MAX_BATCH_SIZE}" />
+ <property name="onlyPrimary" value="false" />
+ <property name="kafkaProperties" ref="kafkaProperties" />
+ <property name="kafkaRequestTimeout" value="{KAFKA_REQ_TIMEOUT}" />
</bean>
</property>
</bean>
diff --git a/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite.xml b/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite.xml
index 154b1b3..8bb35a4 100644
--- a/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite.xml
+++ b/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite.xml
@@ -55,6 +55,7 @@
<property name="kafkaPartsFrom" value="{PARTS_FROM}"/>
<property name="kafkaPartsTo" value="{PARTS_TO}"/>
<property name="threadCount" value="{THREAD_CNT}"/>
+ <property name="kafkaRequestTimeout" value="{KAFKA_REQ_TIMEOUT}"/>
</bean>
<util:properties id="kafkaProperties" location="{PROPS_PATH}" />