You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/18 07:37:30 UTC

[inlong] branch master updated: [INLONG-5092][SDK] Change Kafka default partition assignment strategy to RangeAssignor (#5093)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 785665410 [INLONG-5092][SDK] Change Kafka default partition assignment strategy to RangeAssignor (#5093)
785665410 is described below

commit 7856654107fe7031bade71ae869e9c2422eabd68
Author: 卢春亮 <94...@qq.com>
AuthorDate: Mon Jul 18 15:37:24 2022 +0800

    [INLONG-5092][SDK] Change Kafka default partition assignment strategy to RangeAssignor (#5093)
---
 .../sink/pulsarzone/PulsarClusterProducer.java     | 50 ++++++++++++++++------
 .../sdk/sort/impl/InLongTopicManagerImpl.java      | 18 ++++----
 .../sdk/sort/impl/kafka/AckOffsetOnRebalance.java  | 28 ++++++------
 .../sort/impl/kafka/InLongKafkaFetcherImpl.java    | 45 +++++++++++--------
 .../standalone/source/sortsdk/SortSdkSource.java   |  7 ++-
 5 files changed, 91 insertions(+), 57 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
index 7a13fbbc1..6900baa21 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
@@ -98,9 +98,9 @@ public class PulsarClusterProducer implements LifecycleAware {
     /**
      * Constructor
      * 
-     * @param workerName
-     * @param config
-     * @param context
+     * @param workerName Worker name
+     * @param config Cache cluster configuration
+     * @param context Sink context
      */
     public PulsarClusterProducer(String workerName, CacheClusterConfig config, PulsarZoneSinkContext context) {
         this.workerName = workerName;
@@ -109,8 +109,8 @@ public class PulsarClusterProducer implements LifecycleAware {
         this.context = context.getProducerContext();
         this.state = LifecycleState.IDLE;
         this.cacheClusterName = config.getClusterName();
-        this.tenant = config.getParams().getOrDefault(KEY_TENANT, "pulsar");
-        this.namespace = config.getParams().getOrDefault(KEY_NAMESPACE, "inlong");
+        this.tenant = config.getParams().get(KEY_TENANT);
+        this.namespace = config.getParams().get(KEY_NAMESPACE);
     }
 
     /**
@@ -160,7 +160,7 @@ public class PulsarClusterProducer implements LifecycleAware {
     /**
      * getPulsarCompressionType
      * 
-     * @return CompressionType
+     * @return CompressionType LZ4/NONE/ZLIB/ZSTD/SNAPPY
      */
     private CompressionType getPulsarCompressionType() {
         String type = this.context.getString(KEY_COMPRESSIONTYPE, CompressionType.SNAPPY.name());
@@ -204,7 +204,7 @@ public class PulsarClusterProducer implements LifecycleAware {
     /**
      * getLifecycleState
      * 
-     * @return
+     * @return LifecycleState state
      */
     @Override
     public LifecycleState getLifecycleState() {
@@ -212,21 +212,21 @@ public class PulsarClusterProducer implements LifecycleAware {
     }
 
     /**
-     * send
+     * send DispatchProfile
      * 
-     * @param event
+     * @param event DispatchProfile
+     * @return boolean sendResult
      */
     public boolean send(DispatchProfile event) {
         try {
             // topic
-            String baseTopic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
-            if (baseTopic == null) {
+            String producerTopic = this.getProducerTopic(event);
+            if (producerTopic == null) {
                 sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
                 event.fail();
                 return false;
             }
             // get producer
-            String producerTopic = tenant + '/' + namespace + '/' + baseTopic;
             Producer<byte[]> producer = this.producerMap.get(producerTopic);
             if (producer == null) {
                 try {
@@ -282,11 +282,33 @@ public class PulsarClusterProducer implements LifecycleAware {
         }
     }
 
+    /**
+     * getProducerTopic
+     * 
+     * @param event DispatchProfile
+     * @return String Full topic name
+     */
+    private String getProducerTopic(DispatchProfile event) {
+        String baseTopic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
+        if (baseTopic == null) {
+            return null;
+        }
+        StringBuilder builder = new StringBuilder();
+        if (tenant != null) {
+            builder.append(tenant).append("/");
+        }
+        if (namespace != null) {
+            builder.append(namespace).append("/");
+        }
+        builder.append(baseTopic);
+        return builder.toString();
+    }
+
     /**
      * encodeCacheMessageHeaders
      * 
-     * @param  event
-     * @return       Map
+     * @param  event DispatchProfile
+     * @return Map cache message headers
      */
     public Map<String, String> encodeCacheMessageHeaders(DispatchProfile event) {
         Map<String, String> headers = new HashMap<>();
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
index 250570b1b..65f3cc724 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
@@ -76,13 +76,13 @@ public class InLongTopicManagerImpl extends InLongTopicManager {
     }
 
     private boolean initFetcher(InLongTopicFetcher fetcher, InLongTopic inLongTopic) {
-        if (InlongTopicTypeEnum.PULSAR.getName().equals(inLongTopic.getTopicType())) {
+        if (InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
             logger.info("create fetcher topic is pulsar {}", inLongTopic);
             return fetcher.init(pulsarClients.get(inLongTopic.getInLongCluster().getClusterId()));
-        } else if (InlongTopicTypeEnum.KAFKA.getName().equals(inLongTopic.getTopicType())) {
+        } else if (InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
             logger.info("create fetcher topic is kafka {}", inLongTopic);
             return fetcher.init(inLongTopic.getInLongCluster().getBootstraps());
-        } else if (InlongTopicTypeEnum.TUBE.getName().equals(inLongTopic.getTopicType())) {
+        } else if (InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
             logger.info("create fetcher topic is tube {}", inLongTopic);
             return fetcher.init(tubeFactories.get(inLongTopic.getInLongCluster().getClusterId()));
         } else {
@@ -130,13 +130,13 @@ public class InLongTopicManagerImpl extends InLongTopicManager {
      * @return {@link InLongTopicFetcher}
      */
     private InLongTopicFetcher createInLongTopicFetcher(InLongTopic inLongTopic) {
-        if (InlongTopicTypeEnum.PULSAR.getName().equals(inLongTopic.getTopicType())) {
+        if (InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
             logger.info("the topic is pulsar {}", inLongTopic);
             return new InLongPulsarFetcherImpl(inLongTopic, context);
-        } else if (InlongTopicTypeEnum.KAFKA.getName().equals(inLongTopic.getTopicType())) {
+        } else if (InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
             logger.info("the topic is kafka {}", inLongTopic);
             return new InLongKafkaFetcherImpl(inLongTopic, context);
-        } else if (InlongTopicTypeEnum.TUBE.getName().equals(inLongTopic.getTopicType())) {
+        } else if (InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
             logger.info("the topic is tube {}", inLongTopic);
             return new InLongTubeFetcherImpl(inLongTopic, context);
         } else {
@@ -345,13 +345,13 @@ public class InLongTopicManagerImpl extends InLongTopicManager {
     }
 
     private void onlineTopic(InLongTopic inLongTopic) {
-        if (InlongTopicTypeEnum.PULSAR.getName().equals(inLongTopic.getTopicType())) {
+        if (InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
             logger.info("the topic is pulsar:{}", inLongTopic);
             onlinePulsarTopic(inLongTopic);
-        } else if (InlongTopicTypeEnum.KAFKA.getName().equals(inLongTopic.getTopicType())) {
+        } else if (InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
             logger.info("the topic is kafka:{}", inLongTopic);
             onlineKafkaTopic(inLongTopic);
-        } else if (InlongTopicTypeEnum.TUBE.getName().equals(inLongTopic.getTopicType())) {
+        } else if (InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
             logger.info("the topic is tube:{}", inLongTopic);
             onlineTubeTopic(inLongTopic);
         } else {
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java
index efeec277f..faef2cbe9 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java
@@ -19,10 +19,6 @@
 
 package org.apache.inlong.sdk.sort.impl.kafka;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -30,32 +26,36 @@ import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
 public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
 
     private final Logger logger = LoggerFactory.getLogger(AckOffsetOnRebalance.class);
+    private final String clusterId;
     private final KafkaConsumer<byte[], byte[]> consumer;
     private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap;
 
-    public AckOffsetOnRebalance(KafkaConsumer<byte[], byte[]> consumer,
+    public AckOffsetOnRebalance(String clusterId, KafkaConsumer<byte[], byte[]> consumer,
             ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap) {
+        this.clusterId = clusterId;
         this.consumer = consumer;
         this.commitOffsetMap = commitOffsetMap;
     }
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> collection) {
-        logger.debug("*- in ralance:onPartitionsRevoked");
-        while (!commitOffsetMap.isEmpty()) {
-            consumer.commitSync(commitOffsetMap);
-        }
+        logger.debug("execute Rebalance:onPartitionsRevoked");
+        collection.forEach((v) -> {
+            logger.info("clusterId:{},onPartitionsRevoked:{}", clusterId, v.toString());
+        });
     }
 
     @Override
     public void onPartitionsAssigned(Collection<TopicPartition> collection) {
-        logger.debug("*- in ralance:onPartitionsAssigned  ");
-        Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(new HashSet<>(collection));
-        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committed.entrySet()) {
-            consumer.seek(entry.getKey(), entry.getValue().offset());
-        }
+        logger.debug("execute onPartitionsAssigned");
+        collection.forEach((v) -> {
+            logger.info("clusterId:{},onPartitionsAssigned:{}", clusterId, v.toString());
+        });
     }
 }
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
index 8dfd278a6..0ba5862f1 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
@@ -19,29 +19,20 @@
 
 package org.apache.inlong.sdk.sort.impl.kafka;
 
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import com.google.gson.Gson;
+
 import org.apache.inlong.sdk.sort.api.ClientContext;
 import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
 import org.apache.inlong.sdk.sort.api.SortClientConfig.ConsumeStrategy;
 import org.apache.inlong.sdk.sort.entity.InLongMessage;
 import org.apache.inlong.sdk.sort.entity.InLongTopic;
 import org.apache.inlong.sdk.sort.entity.MessageRecord;
-import org.apache.inlong.sdk.sort.util.StringUtil;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
@@ -49,6 +40,17 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class InLongKafkaFetcherImpl extends InLongTopicFetcher {
 
     private final Logger logger = LoggerFactory.getLogger(InLongKafkaFetcherImpl.class);
@@ -68,14 +70,19 @@ public class InLongKafkaFetcherImpl extends InLongTopicFetcher {
         try {
             createKafkaConsumer(bootstrapServers);
             if (consumer != null) {
+                logger.info("start to subscribe topic:{}", new Gson().toJson(inLongTopic));
                 consumer.subscribe(Collections.singletonList(inLongTopic.getTopic()),
-                        new AckOffsetOnRebalance(consumer, commitOffsetMap));
+                        new AckOffsetOnRebalance(this.inLongTopic.getInLongCluster().getClusterId(), consumer,
+                                commitOffsetMap));
             } else {
+                logger.info("consumer is null");
                 return false;
             }
             this.bootstrapServers = bootstrapServers;
-            String threadName = "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date());
+            String threadName = String.format("sort_sdk_fetch_thread_%s_%s_%d",
+                    this.inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic(), this.hashCode());
             this.fetchThread = new Thread(new Fetcher(), threadName);
+            logger.info("start to start thread:{}", threadName);
             this.fetchThread.start();
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
@@ -156,7 +163,6 @@ public class InLongKafkaFetcherImpl extends InLongTopicFetcher {
     private void createKafkaConsumer(String bootstrapServers) {
         Properties properties = new Properties();
         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, context.getConfig().getSortTaskId());
         properties.put(ConsumerConfig.GROUP_ID_CONFIG, context.getConfig().getSortTaskId());
         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                 ByteArrayDeserializer.class.getName());
@@ -181,10 +187,12 @@ public class InLongKafkaFetcherImpl extends InLongTopicFetcher {
                 context.getConfig().getKafkaFetchWaitMs());
         properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
         properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
-                "org.apache.kafka.clients.consumer.StickyAssignor");
+                RangeAssignor.class.getName());
         properties.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 120000L);
         this.bootstrapServers = bootstrapServers;
+        logger.info("start to create kafka consumer:{}", properties);
         this.consumer = new KafkaConsumer<>(properties);
+        logger.info("end to create kafka consumer:{}", consumer);
     }
 
     public class Fetcher implements Runnable {
@@ -194,10 +202,11 @@ public class InLongKafkaFetcherImpl extends InLongTopicFetcher {
                 try {
                     consumer.commitSync(commitOffsetMap);
                     commitOffsetMap.clear();
-                    //TODO monitor commit succ
+                    // TODO monitor commit succ
 
                 } catch (Exception e) {
-                    //TODO monitor commit fail
+                    // TODO monitor commit fail
+                    logger.error(e.getMessage(), e);
                 }
             }
         }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index daeb2e766..c80fdbc9f 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -226,10 +227,12 @@ public final class SortSdkSource extends AbstractSource
 
     /**
      * getSortClientConfigParameters
-     * @return
+     * @return Map
      */
     private Map<String, String> getSortClientConfigParameters() {
-        Map<String, String> sortSdkParams = CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX);
+        Map<String, String> sortSdkParams = new HashMap<>();
+        Map<String, String> commonParams = CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX);
+        sortSdkParams.putAll(commonParams);
         SortTaskConfig taskConfig = SortClusterConfigHolder.getTaskConfig(taskName);
         if (taskConfig != null) {
             Map<String, String> sinkParams = taskConfig.getSinkParams();