You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/04/13 14:32:49 UTC
[skywalking] branch master updated: Simplify kafka configuration in cluster mode (#8857)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 9059ff5827 Simplify kafka configuration in cluster mode (#8857)
9059ff5827 is described below
commit 9059ff5827b615475764ee2b7d72875058e169f0
Author: Daming <zt...@foxmail.com>
AuthorDate: Wed Apr 13 22:32:35 2022 +0800
Simplify kafka configuration in cluster mode (#8857)
---
docs/en/changes/changes.md | 1 +
docs/en/setup/backend/configuration-vocabulary.md | 2 --
.../agent/kafka/KafkaFetcherHandlerRegister.java | 35 ++++++++--------------
.../agent/kafka/module/KafkaFetcherConfig.java | 10 -------
.../agent/kafka/provider/KafkaFetcherProvider.java | 4 +--
.../provider/handler/AbstractKafkaHandler.java | 6 ----
.../kafka/provider/handler/JVMMetricsHandler.java | 2 +-
.../agent/kafka/provider/handler/KafkaHandler.java | 26 ----------------
.../agent/kafka/provider/handler/LogHandler.java | 5 ----
.../provider/handler/MeterServiceHandler.java | 4 +--
.../provider/handler/ServiceManagementHandler.java | 7 ++---
.../provider/handler/TraceSegmentHandler.java | 2 +-
.../src/main/resources/application.yml | 2 --
13 files changed, 23 insertions(+), 83 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 1e3679b17d..92ce648544 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -12,6 +12,7 @@
* [Refactor] Move SQLDatabase(H2/MySQL/PostgreSQL), ElasticSearch and BanyanDB specific configurations out of column.
* Support BanyanDB global index for entities. Log and Segment record entities declare this new feature.
* Remove unnecessary analyzer settings in columns of templates. Many were added due to analyzer's default value.
+* Simplify the Kafka Fetch configuration in cluster mode.
#### UI
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index ee892a5c3f..1b54ab8fcc 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -213,8 +213,6 @@ core|default|role|Option values: `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | bootstrapServers | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. | SW_KAFKA_FETCHER_SERVERS | localhost:9092 |
| - | - | namespace | Namespace aims to isolate multi OAP cluster when using the same Kafka cluster. If you set a namespace for Kafka fetcher, OAP will add a prefix to topic name. You should also set namespace in `agent.config`. The property is named `plugin.kafka.namespace`. | SW_NAMESPACE | - |
| - | - | groupId | A unique string that identifies the consumer group to which this consumer belongs.| - | skywalking-consumer |
-| - | - | consumePartitions | Indicates which PartitionId(s) of the topics is/are assigned to the OAP server. Separated by commas if multiple. | SW_KAFKA_FETCHER_CONSUME_PARTITIONS | - |
-| - | - | isSharding | True when OAP Server is in cluster. | SW_KAFKA_FETCHER_IS_SHARDING | false |
| - | - | createTopicIfNotExist | If true, this creates Kafka topic (if it does not already exist). | - | true |
| - | - | partitions | The number of partitions for the topic being created. | SW_KAFKA_FETCHER_PARTITIONS | 3 |
| - | - | enableNativeProtoLog | Enables fetching and handling native proto log data. | SW_KAFKA_FETCHER_ENABLE_NATIVE_PROTO_LOG | true |
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java
index eb12139a54..c41fbd7dad 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java
@@ -19,7 +19,6 @@
package org.apache.skywalking.oap.server.analyzer.agent.kafka;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
@@ -38,11 +37,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.KafkaHandler;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
@@ -57,14 +54,10 @@ public class KafkaFetcherHandlerRegister implements Runnable {
private ImmutableMap.Builder<String, KafkaHandler> builder = ImmutableMap.builder();
private ImmutableMap<String, KafkaHandler> handlerMap;
- private List<TopicPartition> topicPartitions = Lists.newArrayList();
- private KafkaConsumer<String, Bytes> consumer = null;
+ private final KafkaConsumer<String, Bytes> consumer;
private final KafkaFetcherConfig config;
- private final boolean isSharding;
private final Properties properties;
- private int threadPoolSize = Runtime.getRuntime().availableProcessors() * 2;
- private int threadPoolQueueSize = 10000;
private final ThreadPoolExecutor executor;
private final boolean enableKafkaMessageAutoCommit;
@@ -76,14 +69,11 @@ public class KafkaFetcherHandlerRegister implements Runnable {
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
- if (config.isSharding() && StringUtil.isNotEmpty(config.getConsumePartitions())) {
- isSharding = true;
- } else {
- isSharding = false;
- }
+ int threadPoolSize = Runtime.getRuntime().availableProcessors() * 2;
if (config.getKafkaHandlerThreadPoolSize() > 0) {
threadPoolSize = config.getKafkaHandlerThreadPoolSize();
}
+ int threadPoolQueueSize = 10000;
if (config.getKafkaHandlerThreadPoolQueueSize() > 0) {
threadPoolQueueSize = config.getKafkaHandlerThreadPoolQueueSize();
}
@@ -101,19 +91,15 @@ public class KafkaFetcherHandlerRegister implements Runnable {
public void register(KafkaHandler handler) {
builder.put(handler.getTopic(), handler);
- topicPartitions.addAll(handler.getTopicPartitions());
}
public void start() throws ModuleStartException {
handlerMap = builder.build();
+ builder = null;
createTopicIfNeeded(handlerMap.keySet(), properties);
- if (isSharding) {
- consumer.assign(topicPartitions);
- } else {
- consumer.subscribe(handlerMap.keySet());
- }
+ consumer.subscribe(handlerMap.keySet());
consumer.seekToEnd(consumer.assignment());
executor.submit(this);
}
@@ -125,7 +111,7 @@ public class KafkaFetcherHandlerRegister implements Runnable {
ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofMillis(500L));
if (!consumerRecords.isEmpty()) {
for (final ConsumerRecord<String, Bytes> record : consumerRecords) {
- executor.submit(() -> handlerMap.get(record.topic()).handle(record));
+ executor.submit(() -> Objects.requireNonNull(handlerMap.get(record.topic())).handle(record));
}
if (!enableKafkaMessageAutoCommit) {
consumer.commitAsync();
@@ -138,7 +124,12 @@ public class KafkaFetcherHandlerRegister implements Runnable {
}
private void createTopicIfNeeded(Collection<String> topics, Properties properties) throws ModuleStartException {
- AdminClient adminClient = AdminClient.create(properties);
+ Properties adminProps = new Properties();
+ adminProps.putAll(properties);
+ // remove 'group.id' to avoid unknown configure warning.
+ adminProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
+
+ AdminClient adminClient = AdminClient.create(adminProps);
Set<String> missedTopics = adminClient.describeTopics(topics)
.values()
.entrySet()
@@ -155,7 +146,7 @@ public class KafkaFetcherHandlerRegister implements Runnable {
.collect(Collectors.toSet());
if (!missedTopics.isEmpty()) {
- log.info("Topics" + missedTopics.toString() + " not exist.");
+ log.info("Topics " + missedTopics + " not exist.");
List<NewTopic> newTopicList = missedTopics.stream()
.map(topic -> new NewTopic(
topic,
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java
index 40358aa81f..941abb7625 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java
@@ -42,16 +42,6 @@ public class KafkaFetcherConfig extends ModuleConfig {
*/
private String groupId = "skywalking-consumer";
- /**
- * Which PartitionId(s) of the topics assign to the OAP server. If more than one, is separated by commas.
- */
- private String consumePartitions = "";
-
- /**
- * isSharding was true when OAP Server in cluster.
- */
- private boolean isSharding = false;
-
/**
* If true, create the Kafka topic when it does not exist.
*/
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/KafkaFetcherProvider.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/KafkaFetcherProvider.java
index c07036ffee..288a410074 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/KafkaFetcherProvider.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/KafkaFetcherProvider.java
@@ -42,7 +42,7 @@ import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
@Slf4j
public class KafkaFetcherProvider extends ModuleProvider {
private KafkaFetcherHandlerRegister handlerRegister;
- private KafkaFetcherConfig config;
+ private final KafkaFetcherConfig config;
public KafkaFetcherProvider() {
config = new KafkaFetcherConfig();
@@ -64,7 +64,7 @@ public class KafkaFetcherProvider extends ModuleProvider {
}
@Override
- public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+ public void prepare() throws ServiceNotProvidedException {
handlerRegister = new KafkaFetcherHandlerRegister(config);
}
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/AbstractKafkaHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/AbstractKafkaHandler.java
index 946008bed9..51093fe84e 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/AbstractKafkaHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/AbstractKafkaHandler.java
@@ -45,10 +45,4 @@ public abstract class AbstractKafkaHandler implements KafkaHandler {
}
protected abstract String getPlainTopic();
-
- @Override
- public String getConsumePartitions() {
- return config.getConsumePartitions();
- }
-
}
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java
index 2eddb20d0e..465d5f5d9e 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java
@@ -93,7 +93,7 @@ public class JVMMetricsHandler extends AbstractKafkaHandler {
builder.setServiceInstance(namingLengthControl.formatInstanceName(builder.getServiceInstance()));
builder.getMetricsList().forEach(jvmMetric -> {
- try (Timer timer = histogram.createTimer()) {
+ try (Timer ignored2 = histogram.createTimer()) {
jvmSourceDispatcher.sendMetric(builder.getService(), builder.getServiceInstance(), jvmMetric);
} catch (Exception e) {
errorCounter.inc();
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/KafkaHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/KafkaHandler.java
index 02acb0aeb8..22363a0658 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/KafkaHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/KafkaHandler.java
@@ -18,40 +18,14 @@
package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
-import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.skywalking.oap.server.library.util.StringUtil;
/**
* A Handler for dealing Message reported by agent. It is binding to a topic of Kafka, and deserialize.
*/
public interface KafkaHandler {
- /**
- * Which partition(s) of the topic is handled in cluster mode. Currently, we have to specify the handler working for
- * partition(s).
- */
- default List<TopicPartition> getTopicPartitions() {
- if (StringUtil.isEmpty(getConsumePartitions())) {
- return Collections.EMPTY_LIST;
- }
-
- List<TopicPartition> topicPartitions = Lists.newArrayList();
- for (final String partition : getConsumePartitions().trim().split("\\s*,\\s*")) {
- topicPartitions.add(new TopicPartition(getTopic(), Integer.parseInt(partition)));
- }
- return topicPartitions;
- }
-
- /**
- *
- */
- String getConsumePartitions();
-
/**
* A topic of Kafka is handled.
*/
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java
index ac7e8a2552..c563d7a5cc 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java
@@ -62,11 +62,6 @@ public class LogHandler extends AbstractKafkaHandler {
);
}
- @Override
- public String getConsumePartitions() {
- return config.getConsumePartitions();
- }
-
@Override
protected String getPlainTopic() {
return config.getTopicNameOfLogs();
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java
index aa36144b1a..25efc83050 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java
@@ -71,11 +71,11 @@ public class MeterServiceHandler extends AbstractKafkaHandler {
@Override
public void handle(final ConsumerRecord<String, Bytes> record) {
- try (HistogramMetrics.Timer timer = histogramBatch.createTimer()) {
+ try (HistogramMetrics.Timer ignored = histogramBatch.createTimer()) {
MeterDataCollection meterDataCollection = MeterDataCollection.parseFrom(record.value().get());
MeterProcessor processor = processService.createProcessor();
meterDataCollection.getMeterDataList().forEach(meterData -> {
- try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
+ try (HistogramMetrics.Timer ignored2 = histogram.createTimer()) {
processor.read(meterData);
} catch (Exception e) {
errorCounter.inc();
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java
index 7e8e4f2330..8d6bb77a22 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java
@@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
import com.google.gson.JsonObject;
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
@@ -71,7 +70,7 @@ public class ServiceManagementHandler extends AbstractKafkaHandler {
}
}
- private final void serviceReportProperties(InstanceProperties request) {
+ private void serviceReportProperties(InstanceProperties request) {
ServiceInstanceUpdate serviceInstanceUpdate = new ServiceInstanceUpdate();
final String serviceName = namingLengthControl.formatServiceName(request.getService());
final String instanceName = namingLengthControl.formatInstanceName(request.getServiceInstance());
@@ -92,14 +91,14 @@ public class ServiceManagementHandler extends AbstractKafkaHandler {
properties.addProperty(prop.getKey(), prop.getValue());
}
});
- properties.addProperty(InstanceTraffic.PropertyUtil.IPV4S, ipv4List.stream().collect(Collectors.joining(",")));
+ properties.addProperty(InstanceTraffic.PropertyUtil.IPV4S, String.join(",", ipv4List));
serviceInstanceUpdate.setProperties(properties);
serviceInstanceUpdate.setTimeBucket(
TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute));
sourceReceiver.receive(serviceInstanceUpdate);
}
- private final void keepAlive(InstancePingPkg request) {
+ private void keepAlive(InstancePingPkg request) {
final long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute);
final String serviceName = namingLengthControl.formatServiceName(request.getService());
final String instanceName = namingLengthControl.formatInstanceName(request.getServiceInstance());
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java
index 4b7189a3ed..8eee60589f 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java
@@ -69,7 +69,7 @@ public class TraceSegmentHandler extends AbstractKafkaHandler {
@Override
public void handle(final ConsumerRecord<String, Bytes> record) {
- try (HistogramMetrics.Timer ignore = histogram.createTimer()) {
+ try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
SegmentObject segment = SegmentObject.parseFrom(record.value().get());
if (log.isDebugEnabled()) {
log.debug(
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index d074f6cccc..0047fba2c6 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -367,8 +367,6 @@ kafka-fetcher:
replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2}
enableNativeProtoLog: ${SW_KAFKA_FETCHER_ENABLE_NATIVE_PROTO_LOG:true}
enableNativeJsonLog: ${SW_KAFKA_FETCHER_ENABLE_NATIVE_JSON_LOG:true}
- isSharding: ${SW_KAFKA_FETCHER_IS_SHARDING:false}
- consumePartitions: ${SW_KAFKA_FETCHER_CONSUME_PARTITIONS:""}
kafkaHandlerThreadPoolSize: ${SW_KAFKA_HANDLER_THREAD_POOL_SIZE:-1}
kafkaHandlerThreadPoolQueueSize: ${SW_KAFKA_HANDLER_THREAD_POOL_QUEUE_SIZE:-1}