You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/01/12 01:41:03 UTC

[GitHub] [incubator-inlong] wardlican opened a new pull request #2142: [INLONG-2077] sort-sdk change pulsar consume mode from listener to fetch

wardlican opened a new pull request #2142:
URL: https://github.com/apache/incubator-inlong/pull/2142


   [INLONG-2077] sort-sdk change pulsar consume mode from listener to fetch
   
   
   ### Title Name: [INLONG-XYZ][component] Title of the pull request
   
   where *XYZ* should be replaced by the actual issue number.
   
   Fixes #<xyz>
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the problem you're trying to solve.*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] dockerzhang merged pull request #2142: [INLONG-2077] sort-sdk change pulsar consume mode from listener to fetch

Posted by GitBox <gi...@apache.org>.
dockerzhang merged pull request #2142:
URL: https://github.com/apache/incubator-inlong/pull/2142


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2142: [INLONG-2077] sort-sdk change pulsar consume mode from listener to fetch

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2142:
URL: https://github.com/apache/incubator-inlong/pull/2142#discussion_r784449274



##########
File path: inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/stat/SortClientStateCounter.java
##########
@@ -236,12 +236,55 @@ public SortClientStateCounter addRequestManagerCommonErrorTimes(long num) {
     /**
      * count manager result param error times
      *
-     * @param num int
-     * @return SortClientStateCounter
+     * @param num long
+     * @return {@link SortClientStateCounter}
      */
     public SortClientStateCounter addRequestManagerParamErrorTimes(long num) {
         count.getAndAdd(15, num);
         return this;
     }
 
+    /**
+     * count thread fetch times
+     *
+     * @param num long
+     * @return {@link SortClientStateCounter}
+     */
+    public SortClientStateCounter addFetchTimes(long num) {
+        count.getAndAdd(16, num);

Review comment:
       magic number:16

##########
File path: inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;
+    private volatile Thread fetchThread;
+    private long sleepTime = 0L;
+    private int emptyPollTimes = 0;
+
+    public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
+            ClientContext context) {
+        super(inLongTopic, context);
+    }
+
+    @Override
+    public void stopConsume(boolean stopConsume) {
+        this.stopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isConsumeStop() {
+        return stopConsume;

Review comment:
       Variable name is not fit with method name.

##########
File path: inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
##########
@@ -32,7 +29,7 @@ public InLongTopicFetcher(InLongTopic inLongTopic, ClientContext context) {
         this.context = context;
     }
 
-    public abstract boolean init(PulsarClient pulsarClient);
+    public abstract boolean init(Object client);

Review comment:
       client can use Java Generics.

##########
File path: inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;
+    private volatile Thread fetchThread;
+    private long sleepTime = 0L;
+    private int emptyPollTimes = 0;
+
+    public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
+            ClientContext context) {
+        super(inLongTopic, context);
+    }
+
+    @Override
+    public void stopConsume(boolean stopConsume) {
+        this.stopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isConsumeStop() {
+        return stopConsume;
+    }
+
+    @Override
+    public InLongTopic getInLongTopic() {
+        return inLongTopic;
+    }
+
+    @Override
+    public long getConsumedDataSize() {
+        return 0;
+    }
+
+    @Override
+    public long getAckedOffset() {
+        return 0;
+    }
+
+    private void ackSucc(String offset) {
+        offsetCache.remove(offset);
+        context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addAckSuccTimes(1);
+    }
+
+    /**
+     * ack Offset
+     *
+     * @param msgOffset String
+     */
+    @Override
+    public void ack(String msgOffset) throws Exception {
+        if (!StringUtils.isEmpty(msgOffset)) {
+            try {
+                if (consumer == null) {
+                    context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("consumer == null");
+                    return;
+                }
+                MessageId messageId = offsetCache.get(msgOffset);
+                if (messageId == null) {
+                    context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("messageId == null");
+                    return;
+                }
+                consumer.acknowledgeAsync(messageId)
+                        .thenAccept(consumer -> ackSucc(msgOffset))
+                        .exceptionally(exception -> {
+                            logger.error("ack fail:{}", msgOffset);
+                            context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                                    inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                                    .addAckFailTimes(1);
+                            return null;
+                        });
+            } catch (Exception e) {
+                context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                        inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addAckFailTimes(1);
+                logger.error(e.getMessage(), e);
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * create Consumer and fetch thread
+     *
+     * @return boolean
+     */
+    @Override
+    public boolean init(Object object) {
+        PulsarClient pulsarClient = (PulsarClient) object;
+        return createConsumer(pulsarClient);
+    }
+
+    private boolean createConsumer(PulsarClient client) {
+        try {
+            consumer = client.newConsumer(Schema.BYTES)
+                    .topic(inLongTopic.getTopic())
+                    .subscriptionName(context.getConfig().getSortTaskId())
+                    .subscriptionType(SubscriptionType.Shared)
+                    .startMessageIdInclusive()
+                    .ackTimeout(10, TimeUnit.SECONDS)
+                    .receiverQueueSize(context.getConfig().getPulsarReceiveQueueSize())
+                    .subscribe();
+
+            String threadName = "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
+            this.fetchThread = new Thread(new Fetcher(), threadName);
+            this.fetchThread.start();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * isValidState
+     */
+    public void isValidState() {
+        if (closed) {
+            throw new IllegalStateException(inLongTopic + " closed.");
+        }
+    }
+
+    /**
+     * pause
+     */
+    @Override
+    public void pause() {
+        if (consumer != null) {
+            consumer.pause();
+        }
+    }
+
+    /**
+     * resume
+     */
+    @Override
+    public void resume() {
+        if (consumer != null) {
+            consumer.resume();
+        }
+    }
+
+    /**
+     * close
+     *
+     * @return true/false
+     */
+    @Override
+    public boolean close() {
+        mainLock.writeLock().lock();
+        try {
+            this.closed = true;
+            try {
+                if (consumer != null) {
+                    consumer.close();
+                }
+                if (fetchThread != null) {
+                    fetchThread.interrupt();
+                }
+            } catch (PulsarClientException e) {
+                e.printStackTrace();
+            }
+
+            logger.info("closed {}", inLongTopic);
+            return true;
+        } finally {
+            mainLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public class Fetcher implements Runnable {
+
+        /**
+         * put the received msg to onFinished method
+         *
+         * @param messageRecords {@link List<MessageRecord>}
+         */
+        private void handleAndCallbackMsg(List<MessageRecord> messageRecords) {
+            long start = System.currentTimeMillis();
+            try {
+                context.getStatManager()
+                        .getStatistics(context.getConfig().getSortTaskId(),
+                                inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                        .addCallbackTimes(1);
+                context.getConfig().getCallback().onFinishedBatch(messageRecords);
+                context.getStatManager()
+                        .getStatistics(context.getConfig().getSortTaskId(),
+                                inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                        .addCallbackTimeCost(System.currentTimeMillis() - start).addCallbackDoneTimes(1);
+            } catch (Exception e) {
+                context.getStatManager()
+                        .getStatistics(context.getConfig().getSortTaskId(),
+                                inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                        .addCallbackErrorTimes(1);
+                e.printStackTrace();
+            }
+        }
+
+        private String getOffset(MessageId msgId) {
+            return Base64.getEncoder().encodeToString(msgId.toByteArray());
+        }
+
+        @Override
+        public void run() {
+            boolean hasPermit;
+            while (true) {
+                hasPermit = false;
+                try {
+                    if (context.getConfig().isStopConsume() || stopConsume) {
+                        TimeUnit.MILLISECONDS.sleep(50);
+                        continue;
+                    }
+
+                    if (sleepTime > 0) {
+                        TimeUnit.MILLISECONDS.sleep(sleepTime);
+                    }
+
+                    context.acquireRequestPermit();
+                    hasPermit = true;
+                    context.getStatManager()
+                            .getStatistics(context.getConfig().getSortTaskId(),
+                                    inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addMsgCount(1).addFetchTimes(1);
+
+                    long startFetchTime = System.currentTimeMillis();
+                    Messages<byte[]> messages = consumer.batchReceive();
+                    context.getStatManager()
+                            .getStatistics(context.getConfig().getSortTaskId(),
+                                    inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addFetchTimeCost(System.currentTimeMillis() - startFetchTime);

Review comment:
       In a loop turn, it is repeat invoking for "System.currentTimeMillis()".

##########
File path: inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;

Review comment:
       It is better that boolean variable has "is" prefix.

##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SubscribeFetchResult.java
##########
@@ -47,19 +46,45 @@
      * @param sortId The sortId of fetched message.
      * @param message Message that fetched from upstream data storage.
      */
+
     private SubscribeFetchResult(
             final String sortId,
             final MessageRecord message) {
         this.sortId = sortId;
         this.headers.put(Constants.HEADER_KEY_MESSAGE_KEY, message.getMsgKey());
         this.headers.put(Constants.HEADER_KEY_MSG_OFFSET, message.getOffset());
         this.headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(message.getRecTime()));
-        this.headers.putAll(message.getMsgHeader());
-        this.body = message.getMessage();
+        //TODO to fix here

Review comment:
       ditto

##########
File path: inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;
+    private volatile Thread fetchThread;
+    private long sleepTime = 0L;
+    private int emptyPollTimes = 0;
+
+    public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
+            ClientContext context) {
+        super(inLongTopic, context);
+    }
+
+    @Override
+    public void stopConsume(boolean stopConsume) {
+        this.stopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isConsumeStop() {
+        return stopConsume;
+    }
+
+    @Override
+    public InLongTopic getInLongTopic() {
+        return inLongTopic;
+    }
+
+    @Override
+    public long getConsumedDataSize() {
+        return 0;
+    }
+
+    @Override
+    public long getAckedOffset() {
+        return 0;

Review comment:
       Is it not finished? It is better to add a TODO comment.

##########
File path: inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/stat/SortClientStateCounter.java
##########
@@ -236,12 +236,55 @@ public SortClientStateCounter addRequestManagerCommonErrorTimes(long num) {
     /**
      * count manager result param error times
      *
-     * @param num int
-     * @return SortClientStateCounter
+     * @param num long
+     * @return {@link SortClientStateCounter}
      */
     public SortClientStateCounter addRequestManagerParamErrorTimes(long num) {
         count.getAndAdd(15, num);
         return this;
     }
 
+    /**
+     * count thread fetch times
+     *
+     * @param num long
+     * @return {@link SortClientStateCounter}
+     */
+    public SortClientStateCounter addFetchTimes(long num) {
+        count.getAndAdd(16, num);
+        return this;
+    }
+
+    /**
+     * count fetch error times
+     *
+     * @param num long
+     * @return {@link SortClientStateCounter}
+     */
+    public SortClientStateCounter addFetchErrorTimes(long num) {
+        count.getAndAdd(17, num);

Review comment:
       ditto

##########
File path: pom.xml
##########
@@ -95,10 +97,10 @@
         <module>inlong-agent</module>
         <module>inlong-manager</module>
         <module>inlong-sort</module>
+        <module>inlong-sdk</module>

Review comment:
       In fucture, inlong-sort module will use inlong-sdk module too, please upper priority of inlong-sdk.

##########
File path: inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;
+    private volatile Thread fetchThread;
+    private long sleepTime = 0L;
+    private int emptyPollTimes = 0;
+
+    public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
+            ClientContext context) {
+        super(inLongTopic, context);
+    }
+
+    @Override
+    public void stopConsume(boolean stopConsume) {
+        this.stopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isConsumeStop() {
+        return stopConsume;
+    }
+
+    @Override
+    public InLongTopic getInLongTopic() {
+        return inLongTopic;
+    }
+
+    @Override
+    public long getConsumedDataSize() {
+        return 0;

Review comment:
       Is it not finished? It is better to add a TODO comment.

##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
##########
@@ -100,10 +102,17 @@ public void setClient(@NotNull SortClient client) {
     public void onFinished(final MessageRecord messageRecord) {
         try {
             Preconditions.checkState(messageRecord != null, "Fetched msg is null.");
-            final SubscribeFetchResult result = SubscribeFetchResult.Factory.create(sortId, messageRecord);
-            final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders());
-            channelProcessor.processEvent(profileEvent);
-            context.reportToMetric(profileEvent, sortId, "-", SortSdkSourceContext.FetchResult.SUCCESS);
+            for (InLongMessage inLongMessage : messageRecord.getMsgs()) {
+                //TODO fix here

Review comment:
       The description about TODO is not clear.

##########
File path: inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;
+    private volatile Thread fetchThread;
+    private long sleepTime = 0L;
+    private int emptyPollTimes = 0;
+
+    public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
+            ClientContext context) {
+        super(inLongTopic, context);
+    }
+
+    @Override
+    public void stopConsume(boolean stopConsume) {
+        this.stopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isConsumeStop() {
+        return stopConsume;
+    }
+
+    @Override
+    public InLongTopic getInLongTopic() {
+        return inLongTopic;
+    }
+
+    @Override
+    public long getConsumedDataSize() {
+        return 0;
+    }
+
+    @Override
+    public long getAckedOffset() {
+        return 0;
+    }
+
+    private void ackSucc(String offset) {
+        offsetCache.remove(offset);
+        context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addAckSuccTimes(1);
+    }
+
+    /**
+     * ack Offset
+     *
+     * @param msgOffset String
+     */
+    @Override
+    public void ack(String msgOffset) throws Exception {
+        if (!StringUtils.isEmpty(msgOffset)) {
+            try {
+                if (consumer == null) {
+                    context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("consumer == null");
+                    return;
+                }
+                MessageId messageId = offsetCache.get(msgOffset);
+                if (messageId == null) {
+                    context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("messageId == null");
+                    return;
+                }
+                consumer.acknowledgeAsync(messageId)
+                        .thenAccept(consumer -> ackSucc(msgOffset))
+                        .exceptionally(exception -> {
+                            logger.error("ack fail:{}", msgOffset);
+                            context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                                    inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                                    .addAckFailTimes(1);
+                            return null;
+                        });
+            } catch (Exception e) {
+                context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                        inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addAckFailTimes(1);
+                logger.error(e.getMessage(), e);
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * create Consumer and fetch thread
+     *
+     * @return boolean
+     */
+    @Override
+    public boolean init(Object object) {
+        PulsarClient pulsarClient = (PulsarClient) object;
+        return createConsumer(pulsarClient);
+    }
+
+    private boolean createConsumer(PulsarClient client) {
+        try {
+            consumer = client.newConsumer(Schema.BYTES)
+                    .topic(inLongTopic.getTopic())
+                    .subscriptionName(context.getConfig().getSortTaskId())
+                    .subscriptionType(SubscriptionType.Shared)
+                    .startMessageIdInclusive()
+                    .ackTimeout(10, TimeUnit.SECONDS)
+                    .receiverQueueSize(context.getConfig().getPulsarReceiveQueueSize())
+                    .subscribe();
+
+            String threadName = "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
+            this.fetchThread = new Thread(new Fetcher(), threadName);
+            this.fetchThread.start();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * isValidState
+     */
+    public void isValidState() {
+        if (closed) {
+            throw new IllegalStateException(inLongTopic + " closed.");
+        }
+    }
+
+    /**
+     * pause
+     */
+    @Override
+    public void pause() {
+        if (consumer != null) {
+            consumer.pause();
+        }
+    }
+
+    /**
+     * resume
+     */
+    @Override
+    public void resume() {
+        if (consumer != null) {
+            consumer.resume();
+        }
+    }
+
+    /**
+     * close
+     *
+     * @return true/false
+     */
+    @Override
+    public boolean close() {
+        mainLock.writeLock().lock();
+        try {
+            this.closed = true;
+            try {
+                if (consumer != null) {
+                    consumer.close();
+                }
+                if (fetchThread != null) {
+                    fetchThread.interrupt();
+                }
+            } catch (PulsarClientException e) {
+                e.printStackTrace();
+            }
+
+            logger.info("closed {}", inLongTopic);
+            return true;
+        } finally {
+            mainLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public class Fetcher implements Runnable {

Review comment:
       Fetcher runnable invoke InLongPulsarFetcherImpl method.
   It is not cohesion.
   Two classes can change to one.

##########
File path: inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;
+    private volatile Thread fetchThread;
+    private long sleepTime = 0L;
+    private int emptyPollTimes = 0;
+
+    public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
+            ClientContext context) {
+        super(inLongTopic, context);
+    }
+
+    @Override
+    public void stopConsume(boolean stopConsume) {
+        this.stopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isConsumeStop() {
+        return stopConsume;
+    }
+
+    @Override
+    public InLongTopic getInLongTopic() {
+        return inLongTopic;
+    }
+
+    @Override
+    public long getConsumedDataSize() {
+        return 0;
+    }
+
+    @Override
+    public long getAckedOffset() {
+        return 0;
+    }
+
+    private void ackSucc(String offset) {
+        offsetCache.remove(offset);
+        context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addAckSuccTimes(1);
+    }
+
+    /**
+     * ack Offset
+     *
+     * @param msgOffset String
+     */
+    @Override
+    public void ack(String msgOffset) throws Exception {
+        if (!StringUtils.isEmpty(msgOffset)) {
+            try {
+                if (consumer == null) {
+                    context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("consumer == null");
+                    return;
+                }
+                MessageId messageId = offsetCache.get(msgOffset);
+                if (messageId == null) {
+                    context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("messageId == null");
+                    return;
+                }
+                consumer.acknowledgeAsync(messageId)
+                        .thenAccept(consumer -> ackSucc(msgOffset))
+                        .exceptionally(exception -> {
+                            logger.error("ack fail:{}", msgOffset);
+                            context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                                    inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                                    .addAckFailTimes(1);
+                            return null;
+                        });
+            } catch (Exception e) {
+                context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                        inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addAckFailTimes(1);
+                logger.error(e.getMessage(), e);
+                throw e;

Review comment:
       How to process this exception when sort invoke acking method?
   It is better that Sort-sdk process this exception.

##########
File path: inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;
+    private volatile Thread fetchThread;
+    private long sleepTime = 0L;
+    private int emptyPollTimes = 0;
+
+    public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
+            ClientContext context) {
+        super(inLongTopic, context);
+    }
+
+    @Override
+    public void stopConsume(boolean stopConsume) {
+        this.stopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isConsumeStop() {
+        return stopConsume;
+    }
+
+    @Override
+    public InLongTopic getInLongTopic() {
+        return inLongTopic;
+    }
+
+    @Override
+    public long getConsumedDataSize() {
+        return 0;
+    }
+
+    @Override
+    public long getAckedOffset() {
+        return 0;
+    }
+
+    private void ackSucc(String offset) {
+        offsetCache.remove(offset);
+        context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addAckSuccTimes(1);
+    }
+
+    /**
+     * ack Offset
+     *
+     * @param msgOffset String
+     */
+    @Override
+    public void ack(String msgOffset) throws Exception {
+        if (!StringUtils.isEmpty(msgOffset)) {
+            try {
+                if (consumer == null) {
+                    context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("consumer == null");
+                    return;
+                }
+                MessageId messageId = offsetCache.get(msgOffset);
+                if (messageId == null) {
+                    context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("messageId == null");
+                    return;
+                }
+                consumer.acknowledgeAsync(messageId)
+                        .thenAccept(consumer -> ackSucc(msgOffset))
+                        .exceptionally(exception -> {
+                            logger.error("ack fail:{}", msgOffset);
+                            context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                                    inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                                    .addAckFailTimes(1);
+                            return null;
+                        });
+            } catch (Exception e) {
+                context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                        inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addAckFailTimes(1);
+                logger.error(e.getMessage(), e);
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * create Consumer and fetch thread
+     *
+     * @return boolean
+     */
+    @Override
+    public boolean init(Object object) {
+        PulsarClient pulsarClient = (PulsarClient) object;
+        return createConsumer(pulsarClient);
+    }
+
+    private boolean createConsumer(PulsarClient client) {
+        try {
+            consumer = client.newConsumer(Schema.BYTES)
+                    .topic(inLongTopic.getTopic())
+                    .subscriptionName(context.getConfig().getSortTaskId())
+                    .subscriptionType(SubscriptionType.Shared)
+                    .startMessageIdInclusive()
+                    .ackTimeout(10, TimeUnit.SECONDS)
+                    .receiverQueueSize(context.getConfig().getPulsarReceiveQueueSize())
+                    .subscribe();
+
+            String threadName = "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
+            this.fetchThread = new Thread(new Fetcher(), threadName);

Review comment:
       It is better solution to use a thread pool.

##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
##########
@@ -113,6 +122,16 @@ public void onFinished(final MessageRecord messageRecord) {
         }
     }
 
+    /**
+     * The callback function that SortSDK invoke when fetch messages batch
+     *
+     * @param messageRecord {@link List<MessageRecord>}
+     */
+    @Override
+    public void onFinishedBatch(List<MessageRecord> messageRecord) {
+        //TODO

Review comment:
       The description about TODO is not clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter commented on pull request #2142: [INLONG-2077] sort-sdk change pulsar consume mode from listener to fetch

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #2142:
URL: https://github.com/apache/incubator-inlong/pull/2142#issuecomment-1010584203


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2142?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2142](https://codecov.io/gh/apache/incubator-inlong/pull/2142?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5d55a58) into [master](https://codecov.io/gh/apache/incubator-inlong/commit/f51179a30f75a910f3d17e6192195753ed83f31a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f51179a) will **increase** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/2142/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/2142?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2142      +/-   ##
   ============================================
   + Coverage     12.26%   12.28%   +0.01%     
   + Complexity     1159     1158       -1     
   ============================================
     Files           413      413              
     Lines         35215    35215              
     Branches       5542     5542              
   ============================================
   + Hits           4320     4326       +6     
   + Misses        30125    30119       -6     
     Partials        770      770              
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/2142?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../producer/qltystats/DefaultBrokerRcvQltyStats.java](https://codecov.io/gh/apache/incubator-inlong/pull/2142/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL2NsaWVudC9wcm9kdWNlci9xbHR5c3RhdHMvRGVmYXVsdEJyb2tlclJjdlFsdHlTdGF0cy5qYXZh) | `45.70% <0.00%> (-0.40%)` | :arrow_down: |
   | [.../java/org/apache/flume/sink/tubemq/TubemqSink.java](https://codecov.io/gh/apache/incubator-inlong/pull/2142/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29ubmVjdG9ycy90dWJlbXEtY29ubmVjdG9yLWZsdW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9mbHVtZS9zaW5rL3R1YmVtcS9UdWJlbXFTaW5rLmphdmE=) | `55.42% <0.00%> (+4.00%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2142?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2142?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [f51179a...5d55a58](https://codecov.io/gh/apache/incubator-inlong/pull/2142?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org