You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/18 07:37:30 UTC
[inlong] branch master updated: [INLONG-5092][SDK] Change Kafka default partition assignment strategy to RangeAssignor (#5093)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 785665410 [INLONG-5092][SDK] Change Kafka default partition assignment strategy to RangeAssignor (#5093)
785665410 is described below
commit 7856654107fe7031bade71ae869e9c2422eabd68
Author: 卢春亮 <94...@qq.com>
AuthorDate: Mon Jul 18 15:37:24 2022 +0800
[INLONG-5092][SDK] Change Kafka default partition assignment strategy to RangeAssignor (#5093)
---
.../sink/pulsarzone/PulsarClusterProducer.java | 50 ++++++++++++++++------
.../sdk/sort/impl/InLongTopicManagerImpl.java | 18 ++++----
.../sdk/sort/impl/kafka/AckOffsetOnRebalance.java | 28 ++++++------
.../sort/impl/kafka/InLongKafkaFetcherImpl.java | 45 +++++++++++--------
.../standalone/source/sortsdk/SortSdkSource.java | 7 ++-
5 files changed, 91 insertions(+), 57 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
index 7a13fbbc1..6900baa21 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
@@ -98,9 +98,9 @@ public class PulsarClusterProducer implements LifecycleAware {
/**
* Constructor
*
- * @param workerName
- * @param config
- * @param context
+ * @param workerName Worker name
+ * @param config Cache cluster configuration
+ * @param context Sink context
*/
public PulsarClusterProducer(String workerName, CacheClusterConfig config, PulsarZoneSinkContext context) {
this.workerName = workerName;
@@ -109,8 +109,8 @@ public class PulsarClusterProducer implements LifecycleAware {
this.context = context.getProducerContext();
this.state = LifecycleState.IDLE;
this.cacheClusterName = config.getClusterName();
- this.tenant = config.getParams().getOrDefault(KEY_TENANT, "pulsar");
- this.namespace = config.getParams().getOrDefault(KEY_NAMESPACE, "inlong");
+ this.tenant = config.getParams().get(KEY_TENANT);
+ this.namespace = config.getParams().get(KEY_NAMESPACE);
}
/**
@@ -160,7 +160,7 @@ public class PulsarClusterProducer implements LifecycleAware {
/**
* getPulsarCompressionType
*
- * @return CompressionType
+ * @return CompressionType LZ4/NONE/ZLIB/ZSTD/SNAPPY
*/
private CompressionType getPulsarCompressionType() {
String type = this.context.getString(KEY_COMPRESSIONTYPE, CompressionType.SNAPPY.name());
@@ -204,7 +204,7 @@ public class PulsarClusterProducer implements LifecycleAware {
/**
* getLifecycleState
*
- * @return
+ * @return LifecycleState state
*/
@Override
public LifecycleState getLifecycleState() {
@@ -212,21 +212,21 @@ public class PulsarClusterProducer implements LifecycleAware {
}
/**
- * send
+ * send DispatchProfile
*
- * @param event
+ * @param event DispatchProfile
+ * @return boolean sendResult
*/
public boolean send(DispatchProfile event) {
try {
// topic
- String baseTopic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
- if (baseTopic == null) {
+ String producerTopic = this.getProducerTopic(event);
+ if (producerTopic == null) {
sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
event.fail();
return false;
}
// get producer
- String producerTopic = tenant + '/' + namespace + '/' + baseTopic;
Producer<byte[]> producer = this.producerMap.get(producerTopic);
if (producer == null) {
try {
@@ -282,11 +282,33 @@ public class PulsarClusterProducer implements LifecycleAware {
}
}
+ /**
+ * getProducerTopic
+ *
+ * @param event DispatchProfile
+ * @return String Full topic name
+ */
+ private String getProducerTopic(DispatchProfile event) {
+ String baseTopic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
+ if (baseTopic == null) {
+ return null;
+ }
+ StringBuilder builder = new StringBuilder();
+ if (tenant != null) {
+ builder.append(tenant).append("/");
+ }
+ if (namespace != null) {
+ builder.append(namespace).append("/");
+ }
+ builder.append(baseTopic);
+ return builder.toString();
+ }
+
/**
* encodeCacheMessageHeaders
*
- * @param event
- * @return Map
+ * @param event DispatchProfile
+ * @return Map cache message headers
*/
public Map<String, String> encodeCacheMessageHeaders(DispatchProfile event) {
Map<String, String> headers = new HashMap<>();
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
index 250570b1b..65f3cc724 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
@@ -76,13 +76,13 @@ public class InLongTopicManagerImpl extends InLongTopicManager {
}
private boolean initFetcher(InLongTopicFetcher fetcher, InLongTopic inLongTopic) {
- if (InlongTopicTypeEnum.PULSAR.getName().equals(inLongTopic.getTopicType())) {
+ if (InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
logger.info("create fetcher topic is pulsar {}", inLongTopic);
return fetcher.init(pulsarClients.get(inLongTopic.getInLongCluster().getClusterId()));
- } else if (InlongTopicTypeEnum.KAFKA.getName().equals(inLongTopic.getTopicType())) {
+ } else if (InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
logger.info("create fetcher topic is kafka {}", inLongTopic);
return fetcher.init(inLongTopic.getInLongCluster().getBootstraps());
- } else if (InlongTopicTypeEnum.TUBE.getName().equals(inLongTopic.getTopicType())) {
+ } else if (InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
logger.info("create fetcher topic is tube {}", inLongTopic);
return fetcher.init(tubeFactories.get(inLongTopic.getInLongCluster().getClusterId()));
} else {
@@ -130,13 +130,13 @@ public class InLongTopicManagerImpl extends InLongTopicManager {
* @return {@link InLongTopicFetcher}
*/
private InLongTopicFetcher createInLongTopicFetcher(InLongTopic inLongTopic) {
- if (InlongTopicTypeEnum.PULSAR.getName().equals(inLongTopic.getTopicType())) {
+ if (InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
logger.info("the topic is pulsar {}", inLongTopic);
return new InLongPulsarFetcherImpl(inLongTopic, context);
- } else if (InlongTopicTypeEnum.KAFKA.getName().equals(inLongTopic.getTopicType())) {
+ } else if (InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
logger.info("the topic is kafka {}", inLongTopic);
return new InLongKafkaFetcherImpl(inLongTopic, context);
- } else if (InlongTopicTypeEnum.TUBE.getName().equals(inLongTopic.getTopicType())) {
+ } else if (InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
logger.info("the topic is tube {}", inLongTopic);
return new InLongTubeFetcherImpl(inLongTopic, context);
} else {
@@ -345,13 +345,13 @@ public class InLongTopicManagerImpl extends InLongTopicManager {
}
private void onlineTopic(InLongTopic inLongTopic) {
- if (InlongTopicTypeEnum.PULSAR.getName().equals(inLongTopic.getTopicType())) {
+ if (InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
logger.info("the topic is pulsar:{}", inLongTopic);
onlinePulsarTopic(inLongTopic);
- } else if (InlongTopicTypeEnum.KAFKA.getName().equals(inLongTopic.getTopicType())) {
+ } else if (InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
logger.info("the topic is kafka:{}", inLongTopic);
onlineKafkaTopic(inLongTopic);
- } else if (InlongTopicTypeEnum.TUBE.getName().equals(inLongTopic.getTopicType())) {
+ } else if (InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
logger.info("the topic is tube:{}", inLongTopic);
onlineTubeTopic(inLongTopic);
} else {
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java
index efeec277f..faef2cbe9 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java
@@ -19,10 +19,6 @@
package org.apache.inlong.sdk.sort.impl.kafka;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -30,32 +26,36 @@ import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
private final Logger logger = LoggerFactory.getLogger(AckOffsetOnRebalance.class);
+ private final String clusterId;
private final KafkaConsumer<byte[], byte[]> consumer;
private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap;
- public AckOffsetOnRebalance(KafkaConsumer<byte[], byte[]> consumer,
+ public AckOffsetOnRebalance(String clusterId, KafkaConsumer<byte[], byte[]> consumer,
ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap) {
+ this.clusterId = clusterId;
this.consumer = consumer;
this.commitOffsetMap = commitOffsetMap;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
- logger.debug("*- in ralance:onPartitionsRevoked");
- while (!commitOffsetMap.isEmpty()) {
- consumer.commitSync(commitOffsetMap);
- }
+ logger.debug("execute Rebalance:onPartitionsRevoked");
+ collection.forEach((v) -> {
+ logger.info("clusterId:{},onPartitionsRevoked:{}", clusterId, v.toString());
+ });
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
- logger.debug("*- in ralance:onPartitionsAssigned ");
- Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(new HashSet<>(collection));
- for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committed.entrySet()) {
- consumer.seek(entry.getKey(), entry.getValue().offset());
- }
+ logger.debug("execute onPartitionsAssigned");
+ collection.forEach((v) -> {
+ logger.info("clusterId:{},onPartitionsAssigned:{}", clusterId, v.toString());
+ });
}
}
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
index 8dfd278a6..0ba5862f1 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
@@ -19,29 +19,20 @@
package org.apache.inlong.sdk.sort.impl.kafka;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import com.google.gson.Gson;
+
import org.apache.inlong.sdk.sort.api.ClientContext;
import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
import org.apache.inlong.sdk.sort.api.SortClientConfig.ConsumeStrategy;
import org.apache.inlong.sdk.sort.entity.InLongMessage;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sdk.sort.entity.MessageRecord;
-import org.apache.inlong.sdk.sort.util.StringUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
@@ -49,6 +40,17 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
public class InLongKafkaFetcherImpl extends InLongTopicFetcher {
private final Logger logger = LoggerFactory.getLogger(InLongKafkaFetcherImpl.class);
@@ -68,14 +70,19 @@ public class InLongKafkaFetcherImpl extends InLongTopicFetcher {
try {
createKafkaConsumer(bootstrapServers);
if (consumer != null) {
+ logger.info("start to subscribe topic:{}", new Gson().toJson(inLongTopic));
consumer.subscribe(Collections.singletonList(inLongTopic.getTopic()),
- new AckOffsetOnRebalance(consumer, commitOffsetMap));
+ new AckOffsetOnRebalance(this.inLongTopic.getInLongCluster().getClusterId(), consumer,
+ commitOffsetMap));
} else {
+ logger.info("consumer is null");
return false;
}
this.bootstrapServers = bootstrapServers;
- String threadName = "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date());
+ String threadName = String.format("sort_sdk_fetch_thread_%s_%s_%d",
+ this.inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic(), this.hashCode());
this.fetchThread = new Thread(new Fetcher(), threadName);
+ logger.info("start to start thread:{}", threadName);
this.fetchThread.start();
} catch (Exception e) {
logger.error(e.getMessage(), e);
@@ -156,7 +163,6 @@ public class InLongKafkaFetcherImpl extends InLongTopicFetcher {
private void createKafkaConsumer(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- properties.put(ConsumerConfig.CLIENT_ID_CONFIG, context.getConfig().getSortTaskId());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, context.getConfig().getSortTaskId());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
@@ -181,10 +187,12 @@ public class InLongKafkaFetcherImpl extends InLongTopicFetcher {
context.getConfig().getKafkaFetchWaitMs());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
- "org.apache.kafka.clients.consumer.StickyAssignor");
+ RangeAssignor.class.getName());
properties.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 120000L);
this.bootstrapServers = bootstrapServers;
+ logger.info("start to create kafka consumer:{}", properties);
this.consumer = new KafkaConsumer<>(properties);
+ logger.info("end to create kafka consumer:{}", consumer);
}
public class Fetcher implements Runnable {
@@ -194,10 +202,11 @@ public class InLongKafkaFetcherImpl extends InLongTopicFetcher {
try {
consumer.commitSync(commitOffsetMap);
commitOffsetMap.clear();
- //TODO monitor commit succ
+ // TODO monitor commit succ
} catch (Exception e) {
- //TODO monitor commit fail
+ // TODO monitor commit fail
+ logger.error(e.getMessage(), e);
}
}
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index daeb2e766..c80fdbc9f 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -226,10 +227,12 @@ public final class SortSdkSource extends AbstractSource
/**
* getSortClientConfigParameters
- * @return
+ * @return Map
*/
private Map<String, String> getSortClientConfigParameters() {
- Map<String, String> sortSdkParams = CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX);
+ Map<String, String> sortSdkParams = new HashMap<>();
+ Map<String, String> commonParams = CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX);
+ sortSdkParams.putAll(commonParams);
SortTaskConfig taskConfig = SortClusterConfigHolder.getTaskConfig(taskName);
if (taskConfig != null) {
Map<String, String> sinkParams = taskConfig.getSinkParams();