You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/04/13 14:32:49 UTC

[skywalking] branch master updated: Simplify kafka configuration in cluster mode (#8857)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 9059ff5827 Simplify kafka configuration in cluster mode (#8857)
9059ff5827 is described below

commit 9059ff5827b615475764ee2b7d72875058e169f0
Author: Daming <zt...@foxmail.com>
AuthorDate: Wed Apr 13 22:32:35 2022 +0800

    Simplify kafka configuration in cluster mode (#8857)
---
 docs/en/changes/changes.md                         |  1 +
 docs/en/setup/backend/configuration-vocabulary.md  |  2 --
 .../agent/kafka/KafkaFetcherHandlerRegister.java   | 35 ++++++++--------------
 .../agent/kafka/module/KafkaFetcherConfig.java     | 10 -------
 .../agent/kafka/provider/KafkaFetcherProvider.java |  4 +--
 .../provider/handler/AbstractKafkaHandler.java     |  6 ----
 .../kafka/provider/handler/JVMMetricsHandler.java  |  2 +-
 .../agent/kafka/provider/handler/KafkaHandler.java | 26 ----------------
 .../agent/kafka/provider/handler/LogHandler.java   |  5 ----
 .../provider/handler/MeterServiceHandler.java      |  4 +--
 .../provider/handler/ServiceManagementHandler.java |  7 ++---
 .../provider/handler/TraceSegmentHandler.java      |  2 +-
 .../src/main/resources/application.yml             |  2 --
 13 files changed, 23 insertions(+), 83 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 1e3679b17d..92ce648544 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -12,6 +12,7 @@
 * [Refactor] Move SQLDatabase(H2/MySQL/PostgreSQL), ElasticSearch and BanyanDB specific configurations out of column.
 * Support BanyanDB global index for entities. Log and Segment record entities declare this new feature.
 * Remove unnecessary analyzer settings in columns of templates. Many were added due to analyzer's default value.
+* Simplify the Kafka Fetch configuration in cluster mode.
 
 #### UI
 
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index ee892a5c3f..1b54ab8fcc 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -213,8 +213,6 @@ core|default|role|Option values: `Mixed/Receiver/Aggregator`. **Receiver** mode
 | - | - | bootstrapServers | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. | SW_KAFKA_FETCHER_SERVERS | localhost:9092                                                          |
 | - | - | namespace | Namespace aims to isolate multi OAP cluster when using the same Kafka cluster. If you set a namespace for Kafka fetcher, OAP will add a prefix to topic name. You should also set namespace in `agent.config`. The property is named `plugin.kafka.namespace`. | SW_NAMESPACE | -                                                                       |
 | - | - | groupId | A unique string that identifies the consumer group to which this consumer belongs.| - | skywalking-consumer                                                     |
-| - | - | consumePartitions | Indicates which PartitionId(s) of the topics is/are assigned to the OAP server. Separated by commas if multiple. | SW_KAFKA_FETCHER_CONSUME_PARTITIONS | -                                                                       |
-| - | - | isSharding | True when OAP Server is in cluster. | SW_KAFKA_FETCHER_IS_SHARDING | false                                                                   |
 | - | - | createTopicIfNotExist | If true, this creates Kafka topic (if it does not already exist). | - | true                                                                    |
 | - | - | partitions | The number of partitions for the topic being created. | SW_KAFKA_FETCHER_PARTITIONS | 3                                                                       |
 | - | - | enableNativeProtoLog | Enables fetching and handling native proto log data. | SW_KAFKA_FETCHER_ENABLE_NATIVE_PROTO_LOG | true                                                                    |
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java
index eb12139a54..c41fbd7dad 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java
@@ -19,7 +19,6 @@
 package org.apache.skywalking.oap.server.analyzer.agent.kafka;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
@@ -38,11 +37,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
 import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.KafkaHandler;
 import org.apache.skywalking.oap.server.library.module.ModuleStartException;
@@ -57,14 +54,10 @@ public class KafkaFetcherHandlerRegister implements Runnable {
     private ImmutableMap.Builder<String, KafkaHandler> builder = ImmutableMap.builder();
     private ImmutableMap<String, KafkaHandler> handlerMap;
 
-    private List<TopicPartition> topicPartitions = Lists.newArrayList();
-    private KafkaConsumer<String, Bytes> consumer = null;
+    private final KafkaConsumer<String, Bytes> consumer;
     private final KafkaFetcherConfig config;
-    private final boolean isSharding;
     private final Properties properties;
 
-    private int threadPoolSize = Runtime.getRuntime().availableProcessors() * 2;
-    private int threadPoolQueueSize = 10000;
     private final ThreadPoolExecutor executor;
     private final boolean enableKafkaMessageAutoCommit;
 
@@ -76,14 +69,11 @@ public class KafkaFetcherHandlerRegister implements Runnable {
         properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
         properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
 
-        if (config.isSharding() && StringUtil.isNotEmpty(config.getConsumePartitions())) {
-            isSharding = true;
-        } else {
-            isSharding = false;
-        }
+        int threadPoolSize = Runtime.getRuntime().availableProcessors() * 2;
         if (config.getKafkaHandlerThreadPoolSize() > 0) {
             threadPoolSize = config.getKafkaHandlerThreadPoolSize();
         }
+        int threadPoolQueueSize = 10000;
         if (config.getKafkaHandlerThreadPoolQueueSize() > 0) {
             threadPoolQueueSize = config.getKafkaHandlerThreadPoolQueueSize();
         }
@@ -101,19 +91,15 @@ public class KafkaFetcherHandlerRegister implements Runnable {
 
     public void register(KafkaHandler handler) {
         builder.put(handler.getTopic(), handler);
-        topicPartitions.addAll(handler.getTopicPartitions());
     }
 
     public void start() throws ModuleStartException {
         handlerMap = builder.build();
+        builder = null;
 
         createTopicIfNeeded(handlerMap.keySet(), properties);
 
-        if (isSharding) {
-            consumer.assign(topicPartitions);
-        } else {
-            consumer.subscribe(handlerMap.keySet());
-        }
+        consumer.subscribe(handlerMap.keySet());
         consumer.seekToEnd(consumer.assignment());
         executor.submit(this);
     }
@@ -125,7 +111,7 @@ public class KafkaFetcherHandlerRegister implements Runnable {
                 ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofMillis(500L));
                 if (!consumerRecords.isEmpty()) {
                     for (final ConsumerRecord<String, Bytes> record : consumerRecords) {
-                        executor.submit(() -> handlerMap.get(record.topic()).handle(record));
+                        executor.submit(() -> Objects.requireNonNull(handlerMap.get(record.topic())).handle(record));
                     }
                     if (!enableKafkaMessageAutoCommit) {
                         consumer.commitAsync();
@@ -138,7 +124,12 @@ public class KafkaFetcherHandlerRegister implements Runnable {
     }
 
     private void createTopicIfNeeded(Collection<String> topics, Properties properties) throws ModuleStartException {
-        AdminClient adminClient = AdminClient.create(properties);
+        Properties adminProps = new Properties();
+        adminProps.putAll(properties);
+        // remove 'group.id' to avoid unknown configure warning.
+        adminProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
+
+        AdminClient adminClient = AdminClient.create(adminProps);
         Set<String> missedTopics = adminClient.describeTopics(topics)
                 .values()
                 .entrySet()
@@ -155,7 +146,7 @@ public class KafkaFetcherHandlerRegister implements Runnable {
                 .collect(Collectors.toSet());
 
         if (!missedTopics.isEmpty()) {
-            log.info("Topics" + missedTopics.toString() + " not exist.");
+            log.info("Topics " + missedTopics + " not exist.");
             List<NewTopic> newTopicList = missedTopics.stream()
                     .map(topic -> new NewTopic(
                             topic,
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java
index 40358aa81f..941abb7625 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java
@@ -42,16 +42,6 @@ public class KafkaFetcherConfig extends ModuleConfig {
      */
     private String groupId = "skywalking-consumer";
 
-    /**
-     * Which PartitionId(s) of the topics assign to the OAP server. If more than one, is separated by commas.
-     */
-    private String consumePartitions = "";
-
-    /**
-     * isSharding was true when OAP Server in cluster.
-     */
-    private boolean isSharding = false;
-
     /**
      * If true, create the Kafka topic when it does not exist.
      */
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/KafkaFetcherProvider.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/KafkaFetcherProvider.java
index c07036ffee..288a410074 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/KafkaFetcherProvider.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/KafkaFetcherProvider.java
@@ -42,7 +42,7 @@ import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 @Slf4j
 public class KafkaFetcherProvider extends ModuleProvider {
     private KafkaFetcherHandlerRegister handlerRegister;
-    private KafkaFetcherConfig config;
+    private final KafkaFetcherConfig config;
 
     public KafkaFetcherProvider() {
         config = new KafkaFetcherConfig();
@@ -64,7 +64,7 @@ public class KafkaFetcherProvider extends ModuleProvider {
     }
 
     @Override
-    public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+    public void prepare() throws ServiceNotProvidedException {
         handlerRegister = new KafkaFetcherHandlerRegister(config);
     }
 
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/AbstractKafkaHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/AbstractKafkaHandler.java
index 946008bed9..51093fe84e 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/AbstractKafkaHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/AbstractKafkaHandler.java
@@ -45,10 +45,4 @@ public abstract class AbstractKafkaHandler implements KafkaHandler {
     }
 
     protected abstract String getPlainTopic();
-
-    @Override
-    public String getConsumePartitions() {
-        return config.getConsumePartitions();
-    }
-
 }
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java
index 2eddb20d0e..465d5f5d9e 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java
@@ -93,7 +93,7 @@ public class JVMMetricsHandler extends AbstractKafkaHandler {
             builder.setServiceInstance(namingLengthControl.formatInstanceName(builder.getServiceInstance()));
 
             builder.getMetricsList().forEach(jvmMetric -> {
-                try (Timer timer = histogram.createTimer()) {
+                try (Timer ignored2 = histogram.createTimer()) {
                     jvmSourceDispatcher.sendMetric(builder.getService(), builder.getServiceInstance(), jvmMetric);
                 } catch (Exception e) {
                     errorCounter.inc();
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/KafkaHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/KafkaHandler.java
index 02acb0aeb8..22363a0658 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/KafkaHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/KafkaHandler.java
@@ -18,40 +18,14 @@
 
 package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
 
-import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.List;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.skywalking.oap.server.library.util.StringUtil;
 
 /**
  * A Handler for dealing Message reported by agent. It is binding to a topic of Kafka, and deserialize.
  */
 public interface KafkaHandler {
 
-    /**
-     * Which partition(s) of the topic is handled in cluster mode. Currently, we have to specify the handler working for
-     * partition(s).
-     */
-    default List<TopicPartition> getTopicPartitions() {
-        if (StringUtil.isEmpty(getConsumePartitions())) {
-            return Collections.EMPTY_LIST;
-        }
-
-        List<TopicPartition> topicPartitions = Lists.newArrayList();
-        for (final String partition : getConsumePartitions().trim().split("\\s*,\\s*")) {
-            topicPartitions.add(new TopicPartition(getTopic(), Integer.parseInt(partition)));
-        }
-        return topicPartitions;
-    }
-
-    /**
-     *
-     */
-    String getConsumePartitions();
-
     /**
      * A topic of Kafka is handled.
      */
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java
index ac7e8a2552..c563d7a5cc 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java
@@ -62,11 +62,6 @@ public class LogHandler extends AbstractKafkaHandler {
         );
     }
 
-    @Override
-    public String getConsumePartitions() {
-        return config.getConsumePartitions();
-    }
-
     @Override
     protected String getPlainTopic() {
         return config.getTopicNameOfLogs();
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java
index aa36144b1a..25efc83050 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java
@@ -71,11 +71,11 @@ public class MeterServiceHandler extends AbstractKafkaHandler {
 
     @Override
     public void handle(final ConsumerRecord<String, Bytes> record) {
-        try (HistogramMetrics.Timer timer = histogramBatch.createTimer()) {
+        try (HistogramMetrics.Timer ignored = histogramBatch.createTimer()) {
             MeterDataCollection meterDataCollection = MeterDataCollection.parseFrom(record.value().get());
             MeterProcessor processor = processService.createProcessor();
             meterDataCollection.getMeterDataList().forEach(meterData -> {
-                try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
+                try (HistogramMetrics.Timer ignored2 = histogram.createTimer()) {
                     processor.read(meterData);
                 } catch (Exception e) {
                     errorCounter.inc();
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java
index 7e8e4f2330..8d6bb77a22 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java
@@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
 import com.google.gson.JsonObject;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
@@ -71,7 +70,7 @@ public class ServiceManagementHandler extends AbstractKafkaHandler {
         }
     }
 
-    private final void serviceReportProperties(InstanceProperties request) {
+    private void serviceReportProperties(InstanceProperties request) {
         ServiceInstanceUpdate serviceInstanceUpdate = new ServiceInstanceUpdate();
         final String serviceName = namingLengthControl.formatServiceName(request.getService());
         final String instanceName = namingLengthControl.formatInstanceName(request.getServiceInstance());
@@ -92,14 +91,14 @@ public class ServiceManagementHandler extends AbstractKafkaHandler {
                 properties.addProperty(prop.getKey(), prop.getValue());
             }
         });
-        properties.addProperty(InstanceTraffic.PropertyUtil.IPV4S, ipv4List.stream().collect(Collectors.joining(",")));
+        properties.addProperty(InstanceTraffic.PropertyUtil.IPV4S, String.join(",", ipv4List));
         serviceInstanceUpdate.setProperties(properties);
         serviceInstanceUpdate.setTimeBucket(
             TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute));
         sourceReceiver.receive(serviceInstanceUpdate);
     }
 
-    private final void keepAlive(InstancePingPkg request) {
+    private void keepAlive(InstancePingPkg request) {
         final long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute);
         final String serviceName = namingLengthControl.formatServiceName(request.getService());
         final String instanceName = namingLengthControl.formatInstanceName(request.getServiceInstance());
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java
index 4b7189a3ed..8eee60589f 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java
@@ -69,7 +69,7 @@ public class TraceSegmentHandler extends AbstractKafkaHandler {
 
     @Override
     public void handle(final ConsumerRecord<String, Bytes> record) {
-        try (HistogramMetrics.Timer ignore = histogram.createTimer()) {
+        try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
             SegmentObject segment = SegmentObject.parseFrom(record.value().get());
             if (log.isDebugEnabled()) {
                 log.debug(
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index d074f6cccc..0047fba2c6 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -367,8 +367,6 @@ kafka-fetcher:
     replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2}
     enableNativeProtoLog: ${SW_KAFKA_FETCHER_ENABLE_NATIVE_PROTO_LOG:true}
     enableNativeJsonLog: ${SW_KAFKA_FETCHER_ENABLE_NATIVE_JSON_LOG:true}
-    isSharding: ${SW_KAFKA_FETCHER_IS_SHARDING:false}
-    consumePartitions: ${SW_KAFKA_FETCHER_CONSUME_PARTITIONS:""}
     kafkaHandlerThreadPoolSize: ${SW_KAFKA_HANDLER_THREAD_POOL_SIZE:-1}
     kafkaHandlerThreadPoolQueueSize: ${SW_KAFKA_HANDLER_THREAD_POOL_QUEUE_SIZE:-1}