You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/09/06 07:52:27 UTC

[GitHub] [inlong] vernedeng opened a new pull request, #5800: [INLONG-5796] Optimize fetcher builders

vernedeng opened a new pull request, #5800:
URL: https://github.com/apache/inlong/pull/5800

   
   - Fixes #5796 
   
   ### Motivation
   
   1. Unify both single and multiple topic fetcher into one Builder. Differ them by
   2. Add params "fetch key" to specify each fetcher
   3. Optimize the kafka rebalance process, sleep 1s each time until all inflight packets have been ack.
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
     *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] vernedeng commented on a diff in pull request #5800: [INLONG-5796][SDK] Optimize fetcher builders

Posted by GitBox <gi...@apache.org>.
vernedeng commented on code in PR #5800:
URL: https://github.com/apache/inlong/pull/5800#discussion_r964626626


##########
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java:
##########
@@ -18,79 +18,151 @@
 
 package org.apache.inlong.sdk.sort.fetcher.kafka;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.sdk.sort.api.Seeker;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AckOffsetOnRebalance.class);
+    private static final long DEFAULT_MAX_WAIT_FOR_ACK_TIME = 15000L;
+    private final long maxWaitForAckTime;
     private final String clusterId;
     private final Seeker seeker;
     private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap;
     private final ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap;
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final AtomicLong revokedNum = new AtomicLong(0);
+    private final AtomicLong assignedNum = new AtomicLong(0);
 
     public AckOffsetOnRebalance(String clusterId, Seeker seeker,
                                 ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap) {
-        this(clusterId, seeker, commitOffsetMap, null);
+        this(clusterId, seeker, commitOffsetMap, null, null);
     }
 
     public AckOffsetOnRebalance(
             String clusterId,
             Seeker seeker,
             ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap,
-            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap) {
+            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap,
+            KafkaConsumer<byte[], byte[]> consumer) {
+        this(clusterId, seeker, commitOffsetMap, ackOffsetMap, consumer, DEFAULT_MAX_WAIT_FOR_ACK_TIME);
+    }
+
+    public AckOffsetOnRebalance(
+            String clusterId,
+            Seeker seeker,
+            ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap,
+            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap,
+            KafkaConsumer<byte[], byte[]> consumer,
+            long maxWaitForAckTime) {
         this.clusterId = clusterId;
         this.seeker = seeker;
         this.commitOffsetMap = commitOffsetMap;
         this.ackOffsetMap = ackOffsetMap;
+        this.consumer = consumer;
+        this.maxWaitForAckTime = maxWaitForAckTime;
     }
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> collection) {
-        LOGGER.debug("*- in re-balance:onPartitionsRevoked");
+        LOGGER.info("*- in re-balance:onPartitionsRevoked, it's the {} time", revokedNum.incrementAndGet());
         collection.forEach((v) -> {
-            LOGGER.info("clusterId:{},onPartitionsRevoked:{}", clusterId, v.toString());
+            LOGGER.debug("clusterId:{},onPartitionsRevoked:{}, position is {}",
+                    clusterId, v.toString(), consumer.position(v));
         });
-        if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) {
-            ackRevokedPartitions(collection);
+
+        try {
+            if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) {
+                //sleep 10s to wait un ack messages
+                long startTime = System.currentTimeMillis();
+                while (System.currentTimeMillis() - startTime < maxWaitForAckTime && !ackReady(collection)) {
+                    TimeUnit.MILLISECONDS.sleep(1000L);
+                }
+                ackRemovedTopicPartitions(collection);
+            }
+        } catch (Throwable t) {
+            LOGGER.warn("got exception in onPartitionsRevoked : ", t);
         }
     }
 
-    private void ackRevokedPartitions(Collection<TopicPartition> collection) {
-        collection.forEach(tp -> {
-            if (!ackOffsetMap.containsKey(tp)) {
-                return;
+    private boolean ackReady(Collection<TopicPartition> revoked) {
+        for (TopicPartition tp : revoked) {
+            ConcurrentSkipListMap<Long, Boolean> tpMap = ackOffsetMap.get(tp);
+            if (Objects.isNull(tpMap)) {
+                continue;
+            }
+            for (Map.Entry<Long, Boolean> entry : tpMap.entrySet()) {
+                if (!entry.getValue()) {
+                    LOGGER.info("tp {}, offset {} has not been ack, wait", tp, entry.getKey());
+                    return false;
+                }
             }
-            ConcurrentSkipListMap<Long, Boolean> tpOffsetMap = ackOffsetMap.remove(tp);
+        }
+        LOGGER.info("all revoked tp have been ack, re-balance right now.");
+        return true;
+    }
+
+    private void ackRemovedTopicPartitions(Collection<TopicPartition> revoked) {
+        LOGGER.info("ack revoked topic partitions");
+        prepareCommit();
+        consumer.commitSync(commitOffsetMap);
+        // remove revoked topic partitions
+        Set<TopicPartition> keySet = ackOffsetMap.keySet();
+        revoked.stream()
+                .filter(keySet::contains)
+                //.peek(tp -> LOGGER.info("remove tp {}", tp))

Review Comment:
   fixed, thx



##########
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java:
##########
@@ -209,69 +216,67 @@ public List<InLongTopic> getTopics() {
     @Override
     public boolean updateTopics(List<InLongTopic> topics) {
         if (needUpdate(topics)) {
-            return updateAll(topics);
+            LOGGER.info("need to update topic");
+            newTopics = topics;
+            return true;
         }
         LOGGER.info("no need to update topics");
         return false;
     }
 
-    private boolean updateAll(Collection<InLongTopic> newTopics) {
-        if (CollectionUtils.isEmpty(newTopics)) {
-            LOGGER.error("new topics is empty or null");
-            return false;
-        }
-
-        // stop
-        this.setStopConsume(true);
-
-        // update
-        this.onlineTopics = newTopics.stream().collect(Collectors.toMap(InLongTopic::getTopic, t -> t));
-        InLongTopic topic = onlineTopics.values().stream().findFirst().get();
-        this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic);
-        this.listener = new AckOffsetOnRebalance(topic.getInLongCluster().getClusterId(), seeker,
-                commitOffsetMap, ackOffsetMap);
-        Optional.ofNullable(interceptor).ifPresent(i -> i.configure(topic));
-
-        // subscribe new
-        consumer.subscribe(onlineTopics.keySet(), listener);
-
-        // resume
-        this.setStopConsume(false);
-        return true;
-    }
-
     private void prepareCommit() {
+        List<Long> removeOffsets = new ArrayList<>();
         ackOffsetMap.forEach((topicPartition, tpOffsetMap) -> {
-            synchronized (tpOffsetMap) {
-                // get the remove list
-                List<Long> removeOffsets = new ArrayList<>();
-                long commitOffset = -1;
-                for (Long ackOffset : tpOffsetMap.keySet()) {
-                    if (!tpOffsetMap.get(ackOffset)) {
-                        break;
-                    }
-                    removeOffsets.add(ackOffset);
-                    commitOffset = ackOffset;
-                }
-                // the first haven't ack, do nothing
-                if (commitOffset == -1) {
-                    return;
+            // get the remove list
+            long commitOffset = -1;
+            for (Long ackOffset : tpOffsetMap.keySet()) {
+                if (!tpOffsetMap.get(ackOffset)) {
+                    break;
                 }
-
-                // remove offset and commit offset
-                removeOffsets.forEach(tpOffsetMap::remove);
-                commitOffsetMap.put(topicPartition, new OffsetAndMetadata(commitOffset));
+                removeOffsets.add(ackOffset);
+                commitOffset = ackOffset;
+            }
+            // the first haven't ack, do nothing
+            if (CollectionUtils.isEmpty(removeOffsets)) {
+                return;
             }
+
+            // remove offset and commit offset
+            removeOffsets.forEach(tpOffsetMap::remove);
+            removeOffsets.clear();
+            commitOffsetMap.put(topicPartition, new OffsetAndMetadata(commitOffset));
         });
     }
 
     public class Fetcher implements Runnable {
 
+        private boolean subscribeNew() {
+            if (CollectionUtils.isEmpty(newTopics)) {
+                //LOGGER.info("new topics is empty or null");

Review Comment:
   fixed, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #5800: [INLONG-5796][SDK] Optimize fetcher builders

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #5800:
URL: https://github.com/apache/inlong/pull/5800#discussion_r964341491


##########
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java:
##########
@@ -209,69 +216,67 @@ public List<InLongTopic> getTopics() {
     @Override
     public boolean updateTopics(List<InLongTopic> topics) {
         if (needUpdate(topics)) {
-            return updateAll(topics);
+            LOGGER.info("need to update topic");
+            newTopics = topics;
+            return true;
         }
         LOGGER.info("no need to update topics");
         return false;
     }
 
-    private boolean updateAll(Collection<InLongTopic> newTopics) {
-        if (CollectionUtils.isEmpty(newTopics)) {
-            LOGGER.error("new topics is empty or null");
-            return false;
-        }
-
-        // stop
-        this.setStopConsume(true);
-
-        // update
-        this.onlineTopics = newTopics.stream().collect(Collectors.toMap(InLongTopic::getTopic, t -> t));
-        InLongTopic topic = onlineTopics.values().stream().findFirst().get();
-        this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic);
-        this.listener = new AckOffsetOnRebalance(topic.getInLongCluster().getClusterId(), seeker,
-                commitOffsetMap, ackOffsetMap);
-        Optional.ofNullable(interceptor).ifPresent(i -> i.configure(topic));
-
-        // subscribe new
-        consumer.subscribe(onlineTopics.keySet(), listener);
-
-        // resume
-        this.setStopConsume(false);
-        return true;
-    }
-
     private void prepareCommit() {
+        List<Long> removeOffsets = new ArrayList<>();
         ackOffsetMap.forEach((topicPartition, tpOffsetMap) -> {
-            synchronized (tpOffsetMap) {
-                // get the remove list
-                List<Long> removeOffsets = new ArrayList<>();
-                long commitOffset = -1;
-                for (Long ackOffset : tpOffsetMap.keySet()) {
-                    if (!tpOffsetMap.get(ackOffset)) {
-                        break;
-                    }
-                    removeOffsets.add(ackOffset);
-                    commitOffset = ackOffset;
-                }
-                // the first haven't ack, do nothing
-                if (commitOffset == -1) {
-                    return;
+            // get the remove list
+            long commitOffset = -1;
+            for (Long ackOffset : tpOffsetMap.keySet()) {
+                if (!tpOffsetMap.get(ackOffset)) {
+                    break;
                 }
-
-                // remove offset and commit offset
-                removeOffsets.forEach(tpOffsetMap::remove);
-                commitOffsetMap.put(topicPartition, new OffsetAndMetadata(commitOffset));
+                removeOffsets.add(ackOffset);
+                commitOffset = ackOffset;
+            }
+            // the first haven't ack, do nothing
+            if (CollectionUtils.isEmpty(removeOffsets)) {
+                return;
             }
+
+            // remove offset and commit offset
+            removeOffsets.forEach(tpOffsetMap::remove);
+            removeOffsets.clear();
+            commitOffsetMap.put(topicPartition, new OffsetAndMetadata(commitOffset));
         });
     }
 
     public class Fetcher implements Runnable {
 
+        private boolean subscribeNew() {
+            if (CollectionUtils.isEmpty(newTopics)) {
+                //LOGGER.info("new topics is empty or null");

Review Comment:
   Remove it.



##########
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java:
##########
@@ -18,79 +18,151 @@
 
 package org.apache.inlong.sdk.sort.fetcher.kafka;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.sdk.sort.api.Seeker;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AckOffsetOnRebalance.class);
+    private static final long DEFAULT_MAX_WAIT_FOR_ACK_TIME = 15000L;
+    private final long maxWaitForAckTime;
     private final String clusterId;
     private final Seeker seeker;
     private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap;
     private final ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap;
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final AtomicLong revokedNum = new AtomicLong(0);
+    private final AtomicLong assignedNum = new AtomicLong(0);
 
     public AckOffsetOnRebalance(String clusterId, Seeker seeker,
                                 ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap) {
-        this(clusterId, seeker, commitOffsetMap, null);
+        this(clusterId, seeker, commitOffsetMap, null, null);
     }
 
     public AckOffsetOnRebalance(
             String clusterId,
             Seeker seeker,
             ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap,
-            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap) {
+            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap,
+            KafkaConsumer<byte[], byte[]> consumer) {
+        this(clusterId, seeker, commitOffsetMap, ackOffsetMap, consumer, DEFAULT_MAX_WAIT_FOR_ACK_TIME);
+    }
+
+    public AckOffsetOnRebalance(
+            String clusterId,
+            Seeker seeker,
+            ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap,
+            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap,
+            KafkaConsumer<byte[], byte[]> consumer,
+            long maxWaitForAckTime) {
         this.clusterId = clusterId;
         this.seeker = seeker;
         this.commitOffsetMap = commitOffsetMap;
         this.ackOffsetMap = ackOffsetMap;
+        this.consumer = consumer;
+        this.maxWaitForAckTime = maxWaitForAckTime;
     }
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> collection) {
-        LOGGER.debug("*- in re-balance:onPartitionsRevoked");
+        LOGGER.info("*- in re-balance:onPartitionsRevoked, it's the {} time", revokedNum.incrementAndGet());
         collection.forEach((v) -> {
-            LOGGER.info("clusterId:{},onPartitionsRevoked:{}", clusterId, v.toString());
+            LOGGER.debug("clusterId:{},onPartitionsRevoked:{}, position is {}",
+                    clusterId, v.toString(), consumer.position(v));
         });
-        if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) {
-            ackRevokedPartitions(collection);
+
+        try {
+            if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) {
+                //sleep 10s to wait un ack messages
+                long startTime = System.currentTimeMillis();
+                while (System.currentTimeMillis() - startTime < maxWaitForAckTime && !ackReady(collection)) {
+                    TimeUnit.MILLISECONDS.sleep(1000L);
+                }
+                ackRemovedTopicPartitions(collection);
+            }
+        } catch (Throwable t) {
+            LOGGER.warn("got exception in onPartitionsRevoked : ", t);
         }
     }
 
-    private void ackRevokedPartitions(Collection<TopicPartition> collection) {
-        collection.forEach(tp -> {
-            if (!ackOffsetMap.containsKey(tp)) {
-                return;
+    private boolean ackReady(Collection<TopicPartition> revoked) {
+        for (TopicPartition tp : revoked) {
+            ConcurrentSkipListMap<Long, Boolean> tpMap = ackOffsetMap.get(tp);
+            if (Objects.isNull(tpMap)) {
+                continue;
+            }
+            for (Map.Entry<Long, Boolean> entry : tpMap.entrySet()) {
+                if (!entry.getValue()) {
+                    LOGGER.info("tp {}, offset {} has not been ack, wait", tp, entry.getKey());
+                    return false;
+                }
             }
-            ConcurrentSkipListMap<Long, Boolean> tpOffsetMap = ackOffsetMap.remove(tp);
+        }
+        LOGGER.info("all revoked tp have been ack, re-balance right now.");
+        return true;
+    }
+
+    private void ackRemovedTopicPartitions(Collection<TopicPartition> revoked) {
+        LOGGER.info("ack revoked topic partitions");
+        prepareCommit();
+        consumer.commitSync(commitOffsetMap);
+        // remove revoked topic partitions
+        Set<TopicPartition> keySet = ackOffsetMap.keySet();
+        revoked.stream()
+                .filter(keySet::contains)
+                //.peek(tp -> LOGGER.info("remove tp {}", tp))

Review Comment:
   Remove it.



##########
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java:
##########
@@ -18,79 +18,151 @@
 
 package org.apache.inlong.sdk.sort.fetcher.kafka;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.sdk.sort.api.Seeker;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AckOffsetOnRebalance.class);
+    private static final long DEFAULT_MAX_WAIT_FOR_ACK_TIME = 15000L;
+    private final long maxWaitForAckTime;
     private final String clusterId;
     private final Seeker seeker;
     private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap;
     private final ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap;
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final AtomicLong revokedNum = new AtomicLong(0);
+    private final AtomicLong assignedNum = new AtomicLong(0);
 
     public AckOffsetOnRebalance(String clusterId, Seeker seeker,
                                 ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap) {
-        this(clusterId, seeker, commitOffsetMap, null);
+        this(clusterId, seeker, commitOffsetMap, null, null);
     }
 
     public AckOffsetOnRebalance(
             String clusterId,
             Seeker seeker,
             ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap,
-            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap) {
+            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap,
+            KafkaConsumer<byte[], byte[]> consumer) {
+        this(clusterId, seeker, commitOffsetMap, ackOffsetMap, consumer, DEFAULT_MAX_WAIT_FOR_ACK_TIME);
+    }
+
+    public AckOffsetOnRebalance(
+            String clusterId,
+            Seeker seeker,
+            ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap,
+            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap,
+            KafkaConsumer<byte[], byte[]> consumer,
+            long maxWaitForAckTime) {
         this.clusterId = clusterId;
         this.seeker = seeker;
         this.commitOffsetMap = commitOffsetMap;
         this.ackOffsetMap = ackOffsetMap;
+        this.consumer = consumer;
+        this.maxWaitForAckTime = maxWaitForAckTime;
     }
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> collection) {
-        LOGGER.debug("*- in re-balance:onPartitionsRevoked");
+        LOGGER.info("*- in re-balance:onPartitionsRevoked, it's the {} time", revokedNum.incrementAndGet());
         collection.forEach((v) -> {
-            LOGGER.info("clusterId:{},onPartitionsRevoked:{}", clusterId, v.toString());
+            LOGGER.debug("clusterId:{},onPartitionsRevoked:{}, position is {}",
+                    clusterId, v.toString(), consumer.position(v));
         });
-        if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) {
-            ackRevokedPartitions(collection);
+
+        try {
+            if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) {
+                //sleep 10s to wait un ack messages

Review Comment:
   Add one blank after `//`.



##########
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java:
##########
@@ -59,9 +59,10 @@ public class SortClientConfig implements Serializable {
     private int maxEmptyPollSleepMs = 500;
     private int emptyPollTimes = 10;
     private int cleanOldConsumerIntervalSec = 60;
+    private int maxConsumerSize = 5;
 
     public SortClientConfig(String sortTaskId, String sortClusterName, InLongTopicChangeListener assignmentsListener,
-            ConsumeStrategy consumeStrategy, String localIp) {
+                            ConsumeStrategy consumeStrategy, String localIp) {

Review Comment:
   No need for these indents.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang merged pull request #5800: [INLONG-5796][SDK] Optimize fetcher builders

Posted by GitBox <gi...@apache.org>.
dockerzhang merged PR #5800:
URL: https://github.com/apache/inlong/pull/5800


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] vernedeng commented on a diff in pull request #5800: [INLONG-5796][SDK] Optimize fetcher builders

Posted by GitBox <gi...@apache.org>.
vernedeng commented on code in PR #5800:
URL: https://github.com/apache/inlong/pull/5800#discussion_r964626383


##########
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java:
##########
@@ -59,9 +59,10 @@ public class SortClientConfig implements Serializable {
     private int maxEmptyPollSleepMs = 500;
     private int emptyPollTimes = 10;
     private int cleanOldConsumerIntervalSec = 60;
+    private int maxConsumerSize = 5;
 
     public SortClientConfig(String sortTaskId, String sortClusterName, InLongTopicChangeListener assignmentsListener,
-            ConsumeStrategy consumeStrategy, String localIp) {
+                            ConsumeStrategy consumeStrategy, String localIp) {

Review Comment:
   fixed, thx



##########
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java:
##########
@@ -18,79 +18,151 @@
 
 package org.apache.inlong.sdk.sort.fetcher.kafka;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.sdk.sort.api.Seeker;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AckOffsetOnRebalance.class);
+    private static final long DEFAULT_MAX_WAIT_FOR_ACK_TIME = 15000L;
+    private final long maxWaitForAckTime;
     private final String clusterId;
     private final Seeker seeker;
     private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap;
     private final ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap;
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final AtomicLong revokedNum = new AtomicLong(0);
+    private final AtomicLong assignedNum = new AtomicLong(0);
 
     public AckOffsetOnRebalance(String clusterId, Seeker seeker,
                                 ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap) {
-        this(clusterId, seeker, commitOffsetMap, null);
+        this(clusterId, seeker, commitOffsetMap, null, null);
     }
 
     public AckOffsetOnRebalance(
             String clusterId,
             Seeker seeker,
             ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap,
-            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap) {
+            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap,
+            KafkaConsumer<byte[], byte[]> consumer) {
+        this(clusterId, seeker, commitOffsetMap, ackOffsetMap, consumer, DEFAULT_MAX_WAIT_FOR_ACK_TIME);
+    }
+
+    public AckOffsetOnRebalance(
+            String clusterId,
+            Seeker seeker,
+            ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap,
+            ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap,
+            KafkaConsumer<byte[], byte[]> consumer,
+            long maxWaitForAckTime) {
         this.clusterId = clusterId;
         this.seeker = seeker;
         this.commitOffsetMap = commitOffsetMap;
         this.ackOffsetMap = ackOffsetMap;
+        this.consumer = consumer;
+        this.maxWaitForAckTime = maxWaitForAckTime;
     }
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> collection) {
-        LOGGER.debug("*- in re-balance:onPartitionsRevoked");
+        LOGGER.info("*- in re-balance:onPartitionsRevoked, it's the {} time", revokedNum.incrementAndGet());
         collection.forEach((v) -> {
-            LOGGER.info("clusterId:{},onPartitionsRevoked:{}", clusterId, v.toString());
+            LOGGER.debug("clusterId:{},onPartitionsRevoked:{}, position is {}",
+                    clusterId, v.toString(), consumer.position(v));
         });
-        if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) {
-            ackRevokedPartitions(collection);
+
+        try {
+            if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) {
+                //sleep 10s to wait un ack messages

Review Comment:
   fixed, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org