You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/25 04:42:48 UTC

[inlong] branch master updated: [INLONG-5623][SDK] Support multi-topic fetcher for Kafka (#5659)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f8d7f8bba [INLONG-5623][SDK] Support multi-topic fetcher for Kafka (#5659)
f8d7f8bba is described below

commit f8d7f8bba6a6737db899c0cc6e4070affdb24d1a
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Thu Aug 25 12:42:41 2022 +0800

    [INLONG-5623][SDK] Support multi-topic fetcher for Kafka (#5659)
---
 .../inlong/sdk/sort/api/MultiTopicsFetcher.java    |  17 +
 .../sort/fetcher/kafka/AckOffsetOnRebalance.java   |  36 ++
 .../fetcher/kafka/KafkaMultiTopicsFetcher.java     | 388 +++++++++++++++++++++
 .../fetcher/pulsar/PulsarMultiTopicsFetcher.java   |  16 -
 4 files changed, 441 insertions(+), 16 deletions(-)

diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java
index 7f7ab6ec4..557ebaa1b 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.sort.api;
 
 import org.apache.inlong.sdk.sort.entity.InLongTopic;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -65,4 +66,20 @@ public abstract class MultiTopicsFetcher implements TopicFetcher {
         this.executor = Executors.newSingleThreadScheduledExecutor();
     }
 
+    protected boolean needUpdate(Collection<InLongTopic> newTopics) {
+        if (newTopics.size() != onlineTopics.size()) {
+            return true;
+        }
+        // all topic should share the same properties in one task
+        if (Objects.equals(newTopics.stream().findFirst(), onlineTopics.values().stream().findFirst())) {
+            return true;
+        }
+        for (InLongTopic topic : newTopics) {
+            if (!onlineTopics.containsKey(topic.getTopic())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
 }
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
index 1aec582fb..4c8d456c6 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
@@ -26,7 +26,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
 
@@ -34,12 +36,22 @@ public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
     private final String clusterId;
     private final Seeker seeker;
     private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap;
+    private final ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap;
 
     public AckOffsetOnRebalance(String clusterId, Seeker seeker,
                                 ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap) {
+        this(clusterId, seeker, commitOffsetMap, null);
+    }
+
+    public AckOffsetOnRebalance(
+            String clusterId,
+            Seeker seeker,
+            ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap,
+            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap) {
         this.clusterId = clusterId;
         this.seeker = seeker;
         this.commitOffsetMap = commitOffsetMap;
+        this.ackOffsetMap = ackOffsetMap;
     }
 
     @Override
@@ -48,6 +60,30 @@ public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
         collection.forEach((v) -> {
             LOGGER.info("clusterId:{},onPartitionsRevoked:{}", clusterId, v.toString());
         });
+        if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) {
+            ackRevokedPartitions(collection);
+        }
+    }
+
+    private void ackRevokedPartitions(Collection<TopicPartition> collection) {
+        collection.forEach(tp -> {
+            if (!ackOffsetMap.containsKey(tp)) {
+                return;
+            }
+            ConcurrentSkipListMap<Long, Boolean> tpOffsetMap = ackOffsetMap.remove(tp);
+            long commitOffset = -1;
+            for (Long ackOffset : tpOffsetMap.keySet()) {
+                if (!tpOffsetMap.get(ackOffset)) {
+                    break;
+                }
+                commitOffset = ackOffset;
+            }
+            // the first haven't ack, do nothing
+            if (commitOffset == -1) {
+                return;
+            }
+            commitOffsetMap.put(tp, new OffsetAndMetadata(commitOffset));
+        });
     }
 
     @Override
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
new file mode 100644
index 000000000..2a4bbdd15
--- /dev/null
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.fetcher.kafka;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.Deserializer;
+import org.apache.inlong.sdk.sort.api.Interceptor;
+import org.apache.inlong.sdk.sort.api.MultiTopicsFetcher;
+import org.apache.inlong.sdk.sort.api.SeekerFactory;
+import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.fetcher.pulsar.PulsarMultiTopicsFetcher;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Kafka multi topics fetcher
+ */
+public class KafkaMultiTopicsFetcher extends MultiTopicsFetcher {
+    private static final Logger LOGGER = LoggerFactory.getLogger(PulsarMultiTopicsFetcher.class);
+    private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap;
+    private final ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap;
+    private final String bootstrapServers;
+    private ConsumerRebalanceListener listener;
+    private KafkaConsumer<byte[], byte[]> consumer;
+
+    public KafkaMultiTopicsFetcher(
+            List<InLongTopic> topics,
+            ClientContext context,
+            Interceptor interceptor,
+            Deserializer deserializer,
+            String bootstrapServers) {
+        super(topics, context, interceptor, deserializer);
+        this.bootstrapServers = bootstrapServers;
+        this.commitOffsetMap = new ConcurrentHashMap<>();
+        this.ackOffsetMap = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public boolean init() {
+        try {
+            this.consumer = createKafkaConsumer();
+            InLongTopic topic = onlineTopics.values().stream().findFirst().get();
+            this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic);
+            this.listener = new AckOffsetOnRebalance(topic.getInLongCluster().getClusterId(), seeker,
+                    commitOffsetMap);
+            consumer.subscribe(onlineTopics.keySet(), listener);
+            return true;
+        } catch (Throwable t) {
+            LOGGER.error("failed to init kafka consumer: ", t);
+            return false;
+        }
+    }
+
+    private KafkaConsumer<byte[], byte[]> createKafkaConsumer() {
+        Properties properties = new Properties();
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, context.getConfig().getSortTaskId());
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+        properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,
+                context.getConfig().getKafkaSocketRecvBufferSize());
+        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        SortClientConfig.ConsumeStrategy offsetResetStrategy = context.getConfig().getOffsetResetStrategy();
+        if (offsetResetStrategy == SortClientConfig.ConsumeStrategy.lastest
+                || offsetResetStrategy == SortClientConfig.ConsumeStrategy.lastest_absolutely) {
+            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+        } else if (offsetResetStrategy == SortClientConfig.ConsumeStrategy.earliest
+                || offsetResetStrategy == SortClientConfig.ConsumeStrategy.earliest_absolutely) {
+            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        } else {
+            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+        }
+        properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
+                context.getConfig().getKafkaFetchSizeBytes());
+        properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
+                context.getConfig().getKafkaFetchWaitMs());
+        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+                RangeAssignor.class.getName());
+        properties.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 120000L);
+        LOGGER.info("start to create kafka consumer:{}", properties);
+        return new KafkaConsumer<>(properties);
+    }
+
+    @Override
+    public void ack(String msgOffset) throws Exception {
+        // the format of multi topic kafka fetcher msg offset is topic:partitionId:offset, such as topic1:20:1746839
+        String[] offset = msgOffset.split(":");
+        if (offset.length != 3) {
+            throw new Exception("offset is illegal, the correct format is topic:partitionId:offset, "
+                    + "the error offset is:" + msgOffset);
+        }
+
+        // parse topic partition offset
+        TopicPartition topicPartition = new TopicPartition(offset[0], Integer.parseInt(offset[1]));
+        long ackOffset = Long.parseLong(offset[2]);
+
+        // ack
+        if (!ackOffsetMap.containsKey(topicPartition) || !ackOffsetMap.get(topicPartition).containsKey(ackOffset)) {
+            LOGGER.warn("did not find offsetMap or ack offset of {}, offset {}, just ignore it",
+                    topicPartition, ackOffset);
+            return;
+        }
+
+        // mark this offset has been ack.
+        ConcurrentSkipListMap<Long, Boolean> tpOffsetMap = ackOffsetMap.get(topicPartition);
+        // to prevent race condition in AckOffsetOnRebalance::onPartitionsRevoked
+        if (Objects.nonNull(tpOffsetMap)) {
+            tpOffsetMap.put(ackOffset, true);
+        }
+    }
+
+    @Override
+    public void pause() {
+        consumer.pause(consumer.assignment());
+    }
+
+    @Override
+    public void resume() {
+        consumer.resume(consumer.assignment());
+    }
+
+    @Override
+    public boolean close() {
+        this.closed = true;
+        try {
+            if (fetchThread != null) {
+                fetchThread.interrupt();
+            }
+            if (consumer != null) {
+                prepareCommit();
+                consumer.commitSync(commitOffsetMap);
+                consumer.close();
+            }
+            commitOffsetMap.clear();
+        } catch (Throwable t) {
+            LOGGER.warn("got exception in multi topic fetcher close: ", t);
+        }
+        LOGGER.info("closed kafka multi topic fetcher");
+        return true;
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed;
+    }
+
+    @Override
+    public void setStopConsume(boolean stopConsume) {
+        this.stopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isStopConsume() {
+        return stopConsume;
+    }
+
+    @Override
+    public List<InLongTopic> getTopics() {
+        return new ArrayList<>(onlineTopics.values());
+    }
+
+    @Override
+    public boolean updateTopics(List<InLongTopic> topics) {
+        if (needUpdate(topics)) {
+            return updateAll(topics);
+        }
+        LOGGER.info("no need to update topics");
+        return false;
+    }
+
+    private boolean updateAll(Collection<InLongTopic> newTopics) {
+        if (CollectionUtils.isEmpty(newTopics)) {
+            LOGGER.error("new topics is empty or null");
+            return false;
+        }
+
+        // stop
+        this.setStopConsume(true);
+
+        // update
+        this.onlineTopics = newTopics.stream().collect(Collectors.toMap(InLongTopic::getTopic, t -> t));
+        InLongTopic topic = onlineTopics.values().stream().findFirst().get();
+        this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic);
+        this.listener = new AckOffsetOnRebalance(topic.getInLongCluster().getClusterId(), seeker,
+                commitOffsetMap, ackOffsetMap);
+        Optional.ofNullable(interceptor).ifPresent(i -> i.configure(topic));
+
+        // subscribe new
+        consumer.subscribe(onlineTopics.keySet(), listener);
+
+        // resume
+        this.setStopConsume(false);
+        return true;
+    }
+
+    private void prepareCommit() {
+        ackOffsetMap.forEach((topicPartition, tpOffsetMap) -> {
+            synchronized (tpOffsetMap) {
+                // get the remove list
+                List<Long> removeOffsets = new ArrayList<>();
+                long commitOffset = -1;
+                for (Long ackOffset : tpOffsetMap.keySet()) {
+                    if (!tpOffsetMap.get(ackOffset)) {
+                        break;
+                    }
+                    removeOffsets.add(ackOffset);
+                    commitOffset = ackOffset;
+                }
+                // the first haven't ack, do nothing
+                if (commitOffset == -1) {
+                    return;
+                }
+
+                // remove offset and commit offset
+                removeOffsets.forEach(tpOffsetMap::remove);
+                commitOffsetMap.put(topicPartition, new OffsetAndMetadata(commitOffset));
+            }
+        });
+    }
+
+    public class Fetcher implements Runnable {
+
+        private void commitKafkaOffset() {
+            prepareCommit();
+            if (consumer != null) {
+                try {
+                    consumer.commitAsync(commitOffsetMap, null);
+                    commitOffsetMap.clear();
+                } catch (Exception e) {
+                    LOGGER.error("commit kafka offset failed: ", e);
+                }
+            }
+        }
+
+        /**
+         * put the received msg to onFinished method
+         *
+         * @param messageRecords {@link List < MessageRecord >}
+         */
+        private void handleAndCallbackMsg(List<MessageRecord> messageRecords) {
+            long start = System.currentTimeMillis();
+            try {
+                context.getDefaultStateCounter().addCallbackTimes(1);
+                context.getConfig().getCallback().onFinishedBatch(messageRecords);
+                context.getDefaultStateCounter()
+                        .addCallbackTimeCost(System.currentTimeMillis() - start)
+                        .addCallbackDoneTimes(1);
+            } catch (Exception e) {
+                context.getDefaultStateCounter().addCallbackErrorTimes(1);
+                LOGGER.error("failed to callback: ", e);
+            }
+        }
+
+        private String getOffset(String topic, int partitionId, long offset) {
+            TopicPartition topicPartition = new TopicPartition(topic, partitionId);
+            ackOffsetMap.computeIfAbsent(topicPartition, k -> new ConcurrentSkipListMap<>()).put(offset, false);
+            return topic + ":" + partitionId + ":" + offset;
+        }
+
+        private Map<String, String> getMsgHeaders(Headers headers) {
+            Map<String, String> headerMap = new HashMap<>();
+            for (Header header : headers) {
+                headerMap.put(header.key(), new String(header.value()));
+            }
+            return headerMap;
+        }
+
+        @Override
+        public void run() {
+            boolean hasPermit;
+            while (true) {
+                hasPermit = false;
+                try {
+                    if (context.getConfig().isStopConsume() || stopConsume) {
+                        TimeUnit.MILLISECONDS.sleep(50);
+                        continue;
+                    }
+
+                    if (sleepTime > 0) {
+                        TimeUnit.MILLISECONDS.sleep(sleepTime);
+                    }
+
+                    context.acquireRequestPermit();
+                    hasPermit = true;
+                    // fetch from kafka
+                    fetchFromKafka();
+                    // commit
+                    commitKafkaOffset();
+                } catch (Exception e) {
+                    context.getDefaultStateCounter().addFetchErrorTimes(1);
+                    LOGGER.error("failed in kafka multi topic fetcher: ", e);
+                } finally {
+                    if (hasPermit) {
+                        context.releaseRequestPermit();
+                    }
+                }
+            }
+        }
+
+        private void fetchFromKafka() throws Exception {
+            context.getDefaultStateCounter().addMsgCount(1).addFetchTimes(1);
+
+            long startFetchTime = System.currentTimeMillis();
+            ConsumerRecords<byte[], byte[]> records = consumer
+                    .poll(Duration.ofMillis(context.getConfig().getKafkaFetchWaitMs()));
+            context.getDefaultStateCounter().addFetchTimeCost(System.currentTimeMillis() - startFetchTime);
+            if (null != records && !records.isEmpty()) {
+
+                for (ConsumerRecord<byte[], byte[]> msg : records) {
+                    List<MessageRecord> msgs = new ArrayList<>();
+                    String topicName = msg.topic();
+                    InLongTopic topic = onlineTopics.get(topicName);
+                    String offsetKey = getOffset(topicName, msg.partition(), msg.offset());
+                    List<InLongMessage> inLongMessages = deserializer
+                            .deserialize(context, topic, getMsgHeaders(msg.headers()), msg.value());
+                    inLongMessages = interceptor.intercept(inLongMessages);
+                    if (inLongMessages.isEmpty()) {
+                        ack(offsetKey);
+                        continue;
+                    }
+
+                    msgs.add(new MessageRecord(topic.getTopicKey(),
+                            inLongMessages,
+                            offsetKey, System.currentTimeMillis()));
+                    context.getStateCounterByTopic(topic).addConsumeSize(msg.value().length);
+                    context.getStateCounterByTopic(topic).addMsgCount(msgs.size());
+                    handleAndCallbackMsg(msgs);
+                }
+                sleepTime = 0L;
+            } else {
+                context.getDefaultStateCounter().addEmptyFetchTimes(1);
+                emptyFetchTimes++;
+                if (emptyFetchTimes >= context.getConfig().getEmptyPollTimes()) {
+                    sleepTime = Math.min((sleepTime += context.getConfig().getEmptyPollSleepStepMs()),
+                            context.getConfig().getMaxEmptyPollSleepMs());
+                    emptyFetchTimes = 0;
+                }
+            }
+        }
+    }
+}
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
index 3244e9d5b..938869bf7 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
@@ -304,22 +304,6 @@ public class PulsarMultiTopicsFetcher extends MultiTopicsFetcher {
         return false;
     }
 
-    private boolean needUpdate(Collection<InLongTopic> newTopics) {
-        if (newTopics.size() != onlineTopics.size()) {
-            return true;
-        }
-        // all topic should share the same properties in one task
-        if (Objects.equals(newTopics.stream().findFirst(), onlineTopics.values().stream().findFirst())) {
-            return true;
-        }
-        for (InLongTopic topic : newTopics) {
-            if (!onlineTopics.containsKey(topic.getTopic())) {
-                return true;
-            }
-        }
-        return false;
-    }
-
     public class Fetcher implements Runnable {
 
         /**