You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/09/07 03:14:12 UTC

[GitHub] [inlong] healchow commented on a diff in pull request #5800: [INLONG-5796][SDK] Optimize fetcher builders

healchow commented on code in PR #5800:
URL: https://github.com/apache/inlong/pull/5800#discussion_r964341491


##########
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java:
##########
@@ -209,69 +216,67 @@ public List<InLongTopic> getTopics() {
     @Override
     public boolean updateTopics(List<InLongTopic> topics) {
         if (needUpdate(topics)) {
-            return updateAll(topics);
+            LOGGER.info("need to update topic");
+            newTopics = topics;
+            return true;
         }
         LOGGER.info("no need to update topics");
         return false;
     }
 
-    private boolean updateAll(Collection<InLongTopic> newTopics) {
-        if (CollectionUtils.isEmpty(newTopics)) {
-            LOGGER.error("new topics is empty or null");
-            return false;
-        }
-
-        // stop
-        this.setStopConsume(true);
-
-        // update
-        this.onlineTopics = newTopics.stream().collect(Collectors.toMap(InLongTopic::getTopic, t -> t));
-        InLongTopic topic = onlineTopics.values().stream().findFirst().get();
-        this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic);
-        this.listener = new AckOffsetOnRebalance(topic.getInLongCluster().getClusterId(), seeker,
-                commitOffsetMap, ackOffsetMap);
-        Optional.ofNullable(interceptor).ifPresent(i -> i.configure(topic));
-
-        // subscribe new
-        consumer.subscribe(onlineTopics.keySet(), listener);
-
-        // resume
-        this.setStopConsume(false);
-        return true;
-    }
-
     private void prepareCommit() {
+        List<Long> removeOffsets = new ArrayList<>();
         ackOffsetMap.forEach((topicPartition, tpOffsetMap) -> {
-            synchronized (tpOffsetMap) {
-                // get the remove list
-                List<Long> removeOffsets = new ArrayList<>();
-                long commitOffset = -1;
-                for (Long ackOffset : tpOffsetMap.keySet()) {
-                    if (!tpOffsetMap.get(ackOffset)) {
-                        break;
-                    }
-                    removeOffsets.add(ackOffset);
-                    commitOffset = ackOffset;
-                }
-                // the first haven't ack, do nothing
-                if (commitOffset == -1) {
-                    return;
+            // get the remove list
+            long commitOffset = -1;
+            for (Long ackOffset : tpOffsetMap.keySet()) {
+                if (!tpOffsetMap.get(ackOffset)) {
+                    break;
                 }
-
-                // remove offset and commit offset
-                removeOffsets.forEach(tpOffsetMap::remove);
-                commitOffsetMap.put(topicPartition, new OffsetAndMetadata(commitOffset));
+                removeOffsets.add(ackOffset);
+                commitOffset = ackOffset;
+            }
+            // the first haven't ack, do nothing
+            if (CollectionUtils.isEmpty(removeOffsets)) {
+                return;
             }
+
+            // remove offset and commit offset
+            removeOffsets.forEach(tpOffsetMap::remove);
+            removeOffsets.clear();
+            commitOffsetMap.put(topicPartition, new OffsetAndMetadata(commitOffset));
         });
     }
 
     public class Fetcher implements Runnable {
 
+        private boolean subscribeNew() {
+            if (CollectionUtils.isEmpty(newTopics)) {
+                //LOGGER.info("new topics is empty or null");

Review Comment:
   Remove it.



##########
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java:
##########
@@ -18,79 +18,151 @@
 
 package org.apache.inlong.sdk.sort.fetcher.kafka;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.sdk.sort.api.Seeker;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AckOffsetOnRebalance.class);
+    private static final long DEFAULT_MAX_WAIT_FOR_ACK_TIME = 15000L;
+    private final long maxWaitForAckTime;
     private final String clusterId;
     private final Seeker seeker;
     private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap;
     private final ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap;
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final AtomicLong revokedNum = new AtomicLong(0);
+    private final AtomicLong assignedNum = new AtomicLong(0);
 
     public AckOffsetOnRebalance(String clusterId, Seeker seeker,
                                 ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap) {
-        this(clusterId, seeker, commitOffsetMap, null);
+        this(clusterId, seeker, commitOffsetMap, null, null);
     }
 
     public AckOffsetOnRebalance(
             String clusterId,
             Seeker seeker,
             ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap,
-            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap) {
+            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap,
+            KafkaConsumer<byte[], byte[]> consumer) {
+        this(clusterId, seeker, commitOffsetMap, ackOffsetMap, consumer, DEFAULT_MAX_WAIT_FOR_ACK_TIME);
+    }
+
+    public AckOffsetOnRebalance(
+            String clusterId,
+            Seeker seeker,
+            ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap,
+            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap,
+            KafkaConsumer<byte[], byte[]> consumer,
+            long maxWaitForAckTime) {
         this.clusterId = clusterId;
         this.seeker = seeker;
         this.commitOffsetMap = commitOffsetMap;
         this.ackOffsetMap = ackOffsetMap;
+        this.consumer = consumer;
+        this.maxWaitForAckTime = maxWaitForAckTime;
     }
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> collection) {
-        LOGGER.debug("*- in re-balance:onPartitionsRevoked");
+        LOGGER.info("*- in re-balance:onPartitionsRevoked, it's the {} time", revokedNum.incrementAndGet());
         collection.forEach((v) -> {
-            LOGGER.info("clusterId:{},onPartitionsRevoked:{}", clusterId, v.toString());
+            LOGGER.debug("clusterId:{},onPartitionsRevoked:{}, position is {}",
+                    clusterId, v.toString(), consumer.position(v));
         });
-        if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) {
-            ackRevokedPartitions(collection);
+
+        try {
+            if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) {
+                //sleep 10s to wait un ack messages
+                long startTime = System.currentTimeMillis();
+                while (System.currentTimeMillis() - startTime < maxWaitForAckTime && !ackReady(collection)) {
+                    TimeUnit.MILLISECONDS.sleep(1000L);
+                }
+                ackRemovedTopicPartitions(collection);
+            }
+        } catch (Throwable t) {
+            LOGGER.warn("got exception in onPartitionsRevoked : ", t);
         }
     }
 
-    private void ackRevokedPartitions(Collection<TopicPartition> collection) {
-        collection.forEach(tp -> {
-            if (!ackOffsetMap.containsKey(tp)) {
-                return;
+    private boolean ackReady(Collection<TopicPartition> revoked) {
+        for (TopicPartition tp : revoked) {
+            ConcurrentSkipListMap<Long, Boolean> tpMap = ackOffsetMap.get(tp);
+            if (Objects.isNull(tpMap)) {
+                continue;
+            }
+            for (Map.Entry<Long, Boolean> entry : tpMap.entrySet()) {
+                if (!entry.getValue()) {
+                    LOGGER.info("tp {}, offset {} has not been ack, wait", tp, entry.getKey());
+                    return false;
+                }
             }
-            ConcurrentSkipListMap<Long, Boolean> tpOffsetMap = ackOffsetMap.remove(tp);
+        }
+        LOGGER.info("all revoked tp have been ack, re-balance right now.");
+        return true;
+    }
+
+    private void ackRemovedTopicPartitions(Collection<TopicPartition> revoked) {
+        LOGGER.info("ack revoked topic partitions");
+        prepareCommit();
+        consumer.commitSync(commitOffsetMap);
+        // remove revoked topic partitions
+        Set<TopicPartition> keySet = ackOffsetMap.keySet();
+        revoked.stream()
+                .filter(keySet::contains)
+                //.peek(tp -> LOGGER.info("remove tp {}", tp))

Review Comment:
   Remove it.



##########
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java:
##########
@@ -18,79 +18,151 @@
 
 package org.apache.inlong.sdk.sort.fetcher.kafka;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.sdk.sort.api.Seeker;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AckOffsetOnRebalance.class);
+    private static final long DEFAULT_MAX_WAIT_FOR_ACK_TIME = 15000L;
+    private final long maxWaitForAckTime;
     private final String clusterId;
     private final Seeker seeker;
     private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap;
     private final ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap;
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final AtomicLong revokedNum = new AtomicLong(0);
+    private final AtomicLong assignedNum = new AtomicLong(0);
 
     public AckOffsetOnRebalance(String clusterId, Seeker seeker,
                                 ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap) {
-        this(clusterId, seeker, commitOffsetMap, null);
+        this(clusterId, seeker, commitOffsetMap, null, null);
     }
 
     public AckOffsetOnRebalance(
             String clusterId,
             Seeker seeker,
             ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap,
-            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap) {
+            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap,
+            KafkaConsumer<byte[], byte[]> consumer) {
+        this(clusterId, seeker, commitOffsetMap, ackOffsetMap, consumer, DEFAULT_MAX_WAIT_FOR_ACK_TIME);
+    }
+
+    public AckOffsetOnRebalance(
+            String clusterId,
+            Seeker seeker,
+            ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap,
+            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap,
+            KafkaConsumer<byte[], byte[]> consumer,
+            long maxWaitForAckTime) {
         this.clusterId = clusterId;
         this.seeker = seeker;
         this.commitOffsetMap = commitOffsetMap;
         this.ackOffsetMap = ackOffsetMap;
+        this.consumer = consumer;
+        this.maxWaitForAckTime = maxWaitForAckTime;
     }
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> collection) {
-        LOGGER.debug("*- in re-balance:onPartitionsRevoked");
+        LOGGER.info("*- in re-balance:onPartitionsRevoked, it's the {} time", revokedNum.incrementAndGet());
         collection.forEach((v) -> {
-            LOGGER.info("clusterId:{},onPartitionsRevoked:{}", clusterId, v.toString());
+            LOGGER.debug("clusterId:{},onPartitionsRevoked:{}, position is {}",
+                    clusterId, v.toString(), consumer.position(v));
         });
-        if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) {
-            ackRevokedPartitions(collection);
+
+        try {
+            if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) {
+                //sleep 10s to wait un ack messages

Review Comment:
   Add one blank after `//`.



##########
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java:
##########
@@ -59,9 +59,10 @@ public class SortClientConfig implements Serializable {
     private int maxEmptyPollSleepMs = 500;
     private int emptyPollTimes = 10;
     private int cleanOldConsumerIntervalSec = 60;
+    private int maxConsumerSize = 5;
 
     public SortClientConfig(String sortTaskId, String sortClusterName, InLongTopicChangeListener assignmentsListener,
-            ConsumeStrategy consumeStrategy, String localIp) {
+                            ConsumeStrategy consumeStrategy, String localIp) {

Review Comment:
   No need for these indents.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org