You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/30 02:28:27 UTC

[inlong] branch master updated: [INLONG-4777][SortStandalone] Change ack policy from checking message count to checking every message (#4778)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f8c46a657 [INLONG-4777][SortStandalone] Change ack policy from checking message count to checking every message (#4778)
f8c46a657 is described below

commit f8c46a6575c5b4506235e02e5868e6786248ee53
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Jun 30 10:28:22 2022 +0800

    [INLONG-4777][SortStandalone] Change ack policy from checking message count to checking every message (#4778)
---
 .../sort/standalone/config/holder/AckPolicy.java   | 77 ++++++++++++++++++++++
 .../config/holder/CommonPropertiesHolder.java      | 21 ++++--
 .../standalone/channel/BufferQueueChannel.java     |  2 +-
 .../standalone/channel/CacheMessageRecord.java     | 55 +++++++++++++++-
 .../sort/standalone/channel/ProfileEvent.java      | 45 ++++++++++---
 .../sort/standalone/sink/cls/ClsSinkContext.java   | 22 +++----
 .../sink/elasticsearch/EsSinkContext.java          | 22 +++----
 .../standalone/source/sortsdk/FetchCallback.java   | 29 +++-----
 .../sink/cls/TestDefaultEvent2LogItemHandler.java  |  3 +-
 .../sink/elasticsearch/TestEsSinkContext.java      |  2 +-
 10 files changed, 217 insertions(+), 61 deletions(-)

diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/AckPolicy.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/AckPolicy.java
new file mode 100644
index 000000000..6dc0127f8
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/AckPolicy.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.holder;
+
+/**
+ * AckPolicy
+ * 
+ */
+public enum AckPolicy {
+
+    COUNT(0), TOKEN(1);
+
+    private final int value;
+
+    /**
+     * Constructor
+     * @param value
+     */
+    private AckPolicy(int value) {
+        this.value = value;
+    }
+
+    /**
+     * getValue
+     * @return int
+     */
+    public int getValue() {
+        return value;
+    }
+
+    /**
+     * getAckPolicy
+     * @param value
+     * @return AckPolicy
+     */
+    public static AckPolicy getAckPolicy(int value) {
+        switch (value) {
+            case 0 :
+                return COUNT;
+            case 1 :
+                return TOKEN;
+            default :
+                return COUNT;
+        }
+    }
+
+    /**
+     * getAckPolicy
+     * @param name
+     * @return AckPolicy
+     */
+    public static AckPolicy getAckPolicy(String name) {
+        if (AckPolicy.COUNT.name().equalsIgnoreCase(name)) {
+            return AckPolicy.COUNT;
+        } else if (AckPolicy.TOKEN.name().equalsIgnoreCase(name)) {
+            return AckPolicy.TOKEN;
+        } else {
+            return AckPolicy.COUNT;
+        }
+    }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
index 05100fe68..129ce24f0 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
@@ -17,9 +17,6 @@
 
 package org.apache.inlong.sort.standalone.config.holder;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Context;
@@ -28,6 +25,9 @@ import org.apache.inlong.sort.standalone.config.loader.CommonPropertiesLoader;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import org.slf4j.Logger;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * 
  * CommonPropertiesHolder
@@ -38,11 +38,13 @@ public class CommonPropertiesHolder {
     public static final String DEFAULT_LOADER = ClassResourceCommonPropertiesLoader.class.getName();
     public static final String KEY_COMMON_PROPERTIES = "common_properties_loader";
     public static final String KEY_CLUSTER_ID = "clusterId";
+    public static final String KEY_SORT_SOURCE_ACKPOLICY = "sortSource.ackPolicy";
 
     private static Map<String, String> props;
     private static Context context;
 
     private static long auditFormatInterval = 60000L;
+    private static AckPolicy ackPolicy;
 
     /**
      * init
@@ -60,8 +62,11 @@ public class CommonPropertiesHolder {
                         CommonPropertiesLoader loader = (CommonPropertiesLoader) loaderObject;
                         props.putAll(loader.load());
                         LOG.info("loaderClass:{},properties:{}", loaderClassName, props);
-                        auditFormatInterval = NumberUtils
+                        CommonPropertiesHolder.auditFormatInterval = NumberUtils
                                 .toLong(CommonPropertiesHolder.getString("auditFormatInterval"), 60000L);
+                        String strAckPolicy = CommonPropertiesHolder.getString(KEY_SORT_SOURCE_ACKPOLICY,
+                                AckPolicy.COUNT.name());
+                        CommonPropertiesHolder.ackPolicy = AckPolicy.getAckPolicy(strAckPolicy);
                     }
                 } catch (Throwable t) {
                     LOG.error("Fail to init CommonPropertiesLoader,loaderClass:{},error:{}",
@@ -205,4 +210,12 @@ public class CommonPropertiesHolder {
         return auditFormatInterval;
     }
 
+    /**
+     * get ackPolicy
+     * @return the ackPolicy
+     */
+    public static AckPolicy getAckPolicy() {
+        return ackPolicy;
+    }
+
 }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
index 52f2643d3..57ad61a81 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
@@ -81,7 +81,7 @@ public class BufferQueueChannel extends AbstractChannel {
             ProfileEvent profile = (ProfileEvent) event;
             transaction.doPut(profile);
         } else {
-            ProfileEvent profile = new ProfileEvent(event.getBody(), event.getHeaders(), null);
+            ProfileEvent profile = new ProfileEvent(event.getHeaders(), event.getBody());
             transaction.doPut(profile);
         }
     }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java
index 63dd51750..bda5f246e 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java
@@ -19,9 +19,12 @@ package org.apache.inlong.sort.standalone.channel;
 
 import org.apache.inlong.sdk.sort.api.SortClient;
 import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sort.standalone.config.holder.AckPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -35,6 +38,8 @@ public class CacheMessageRecord {
     private final String msgKey;
     private final String offset;
     private final AtomicInteger ackCount;
+    private final AckPolicy ackPolicy;
+    private Set<Integer> tokenSet;
 
     /**
      * Constructor
@@ -42,17 +47,47 @@ public class CacheMessageRecord {
      * @param msgRecord
      * @param client
      */
-    public CacheMessageRecord(MessageRecord msgRecord, SortClient client) {
+    public CacheMessageRecord(MessageRecord msgRecord, SortClient client, AckPolicy ackPolicy) {
         this.msgKey = msgRecord.getMsgKey();
         this.offset = msgRecord.getOffset();
         this.ackCount = new AtomicInteger(msgRecord.getMsgs().size());
         this.client = client;
+        this.ackPolicy = ackPolicy;
+        if (AckPolicy.TOKEN.equals(ackPolicy)) {
+            this.tokenSet = new HashSet<>();
+            for (int i = 0; i < msgRecord.getMsgs().size(); i++) {
+                this.tokenSet.add(i);
+            }
+        }
+    }
+
+    /**
+     * getToken
+     * @return
+     */
+    public Integer getToken() {
+        if (AckPolicy.TOKEN.equals(ackPolicy)) {
+            return this.ackCount.decrementAndGet();
+        }
+        return 0;
     }
 
     /**
      * ackMessage
+     * @param ackToken ackToken
+     */
+    public void ackMessage(int ackToken) {
+        if (AckPolicy.TOKEN.equals(ackPolicy)) {
+            this.ackMessageByToken(ackToken);
+            return;
+        }
+        this.ackMessageByCount();
+    }
+
+    /**
+     * ackMessageByCount
      */
-    public void ackMessage() {
+    private void ackMessageByCount() {
         int result = this.ackCount.decrementAndGet();
         if (result == 0 && client != null) {
             try {
@@ -62,4 +97,20 @@ public class CacheMessageRecord {
             }
         }
     }
+
+    /**
+     * ackMessageByToken
+     * @param ackToken ackToken
+     */
+    private void ackMessageByToken(int ackToken) {
+        this.tokenSet.remove(ackToken);
+        int result = this.tokenSet.size();
+        if (result == 0 && client != null) {
+            try {
+                client.ack(msgKey, offset);
+            } catch (Exception e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+    }
 }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
index 27f7962bf..2dfc3dd73 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.standalone.channel;
 
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.event.SimpleEvent;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.utils.Constants;
 
@@ -35,25 +36,45 @@ public class ProfileEvent extends SimpleEvent {
     private final String uid;
 
     private final long rawLogTime;
+    private final String sourceIp;
     private final long fetchTime;
-    private final CacheMessageRecord cacheRecord;
+    private CacheMessageRecord cacheRecord;
+    private final int ackToken;
 
     /**
      * Constructor
-     * 
-     * @param body
      * @param headers
-     * @param cacheRecord
+     * @param body
      */
-    public ProfileEvent(byte[] body, Map<String, String> headers, CacheMessageRecord cacheRecord) {
-        super.setBody(body);
+    public ProfileEvent(Map<String, String> headers, byte[] body) {
         super.setHeaders(headers);
-        this.cacheRecord = cacheRecord;
+        super.setBody(body);
         this.inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
         this.inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
         this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
         this.fetchTime = System.currentTimeMillis();
         this.rawLogTime = NumberUtils.toLong(headers.get(Constants.HEADER_KEY_MSG_TIME), fetchTime);
+        this.sourceIp = headers.get(Constants.HEADER_KEY_SOURCE_IP);
+        this.ackToken = 0;
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param sdkMessage
+     * @param cacheRecord
+     */
+    public ProfileEvent(InLongMessage sdkMessage, CacheMessageRecord cacheRecord) {
+        super.setHeaders(sdkMessage.getParams());
+        super.setBody(sdkMessage.getBody());
+        this.inlongGroupId = sdkMessage.getInlongGroupId();
+        this.inlongStreamId = sdkMessage.getInlongStreamId();
+        this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+        this.rawLogTime = sdkMessage.getMsgTime();
+        this.sourceIp = sdkMessage.getSourceIp();
+        this.cacheRecord = cacheRecord;
+        this.fetchTime = System.currentTimeMillis();
+        this.ackToken = cacheRecord.getToken();
     }
 
     /**
@@ -83,6 +104,14 @@ public class ProfileEvent extends SimpleEvent {
         return rawLogTime;
     }
 
+    /**
+     * get sourceIp
+     * @return the sourceIp
+     */
+    public String getSourceIp() {
+        return sourceIp;
+    }
+
     /**
      * get fetchTime
      * 
@@ -115,7 +144,7 @@ public class ProfileEvent extends SimpleEvent {
      */
     public void ack() {
         if (cacheRecord != null) {
-            cacheRecord.ackMessage();
+            cacheRecord.ackMessage(ackToken);
         }
     }
 }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index 244a4a426..65ce2cb93 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -25,7 +25,6 @@ import com.tencentcloudapi.cls.producer.errors.ProducerException;
 import com.tencentcloudapi.cls.producer.util.NetworkUtils;
 
 import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
@@ -268,25 +267,22 @@ public class ClsSinkContext extends SinkContext {
     public void addSendResultMetric(ProfileEvent currentRecord, String bid, boolean result, long sendTime) {
         Map<String, String> dimensions = this.getDimensions(currentRecord, bid);
         SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
-        long count = 1;
-        long size = currentRecord.getBody().length;
         if (result) {
-            metricItem.sendSuccessCount.addAndGet(count);
-            metricItem.sendSuccessSize.addAndGet(size);
+            metricItem.sendSuccessCount.incrementAndGet();
+            metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
             AuditUtils.add(AuditUtils.AUDIT_ID_SEND_SUCCESS, currentRecord);
             if (sendTime > 0) {
-                long currentTime = System.currentTimeMillis();
+                final long currentTime = System.currentTimeMillis();
                 long sinkDuration = currentTime - sendTime;
-                long nodeDuration = currentTime
-                        - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, currentRecord.getRawLogTime());
+                long nodeDuration = currentTime - currentRecord.getFetchTime();
                 long wholeDuration = currentTime - currentRecord.getRawLogTime();
-                metricItem.sinkDuration.addAndGet(sinkDuration * count);
-                metricItem.nodeDuration.addAndGet(nodeDuration * count);
-                metricItem.wholeDuration.addAndGet(wholeDuration * count);
+                metricItem.sinkDuration.addAndGet(sinkDuration);
+                metricItem.nodeDuration.addAndGet(nodeDuration);
+                metricItem.wholeDuration.addAndGet(wholeDuration);
             }
         } else {
-            metricItem.sendFailCount.addAndGet(count);
-            metricItem.sendFailSize.addAndGet(size);
+            metricItem.sendFailCount.incrementAndGet();
+            metricItem.sendFailSize.addAndGet(currentRecord.getBody().length);
         }
     }
 
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index 3f6abbd4a..6f22a6876 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -228,28 +228,26 @@ public class EsSinkContext extends SinkContext {
         fillInlongId(currentRecord, dimensions);
         dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
         dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, bid);
+        final long currentTime = System.currentTimeMillis();
         long msgTime = currentRecord.getRawLogTime();
         long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
         dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
         SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
-        long count = 1;
-        long size = currentRecord.getBody().length;
         if (result) {
-            metricItem.sendSuccessCount.addAndGet(count);
-            metricItem.sendSuccessSize.addAndGet(size);
+            metricItem.sendSuccessCount.incrementAndGet();
+            metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
             AuditUtils.add(AuditUtils.AUDIT_ID_SEND_SUCCESS, currentRecord);
             if (sendTime > 0) {
-                long currentTime = System.currentTimeMillis();
                 long sinkDuration = currentTime - sendTime;
-                long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
-                long wholeDuration = currentTime - msgTime;
-                metricItem.sinkDuration.addAndGet(sinkDuration * count);
-                metricItem.nodeDuration.addAndGet(nodeDuration * count);
-                metricItem.wholeDuration.addAndGet(wholeDuration * count);
+                long nodeDuration = currentTime - currentRecord.getFetchTime();
+                long wholeDuration = currentTime - currentRecord.getRawLogTime();
+                metricItem.sinkDuration.addAndGet(sinkDuration);
+                metricItem.nodeDuration.addAndGet(nodeDuration);
+                metricItem.wholeDuration.addAndGet(wholeDuration);
             }
         } else {
-            metricItem.sendFailCount.addAndGet(count);
-            metricItem.sendFailSize.addAndGet(size);
+            metricItem.sendFailCount.incrementAndGet();
+            metricItem.sendFailSize.addAndGet(currentRecord.getBody().length);
         }
     }
 
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
index 458a22f4e..af09e1e9a 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
@@ -18,9 +18,7 @@
 package org.apache.inlong.sort.standalone.source.sortsdk;
 
 import com.google.common.base.Preconditions;
-import java.util.List;
-import javax.validation.constraints.NotBlank;
-import javax.validation.constraints.NotNull;
+
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.inlong.sdk.sort.api.ReadCallback;
 import org.apache.inlong.sdk.sort.api.SortClient;
@@ -28,21 +26,18 @@ import org.apache.inlong.sdk.sort.entity.InLongMessage;
 import org.apache.inlong.sdk.sort.entity.MessageRecord;
 import org.apache.inlong.sort.standalone.channel.CacheMessageRecord;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
 /**
  * Implementation of {@link ReadCallback}.
  *
- * TODO: Sort sdk should deliver one object which is held by {@link ProfileEvent} and used to ack upstream data store
- * The code should be like :
- *
- * public void onFinished(final MessageRecord messageRecord, ACKer acker) {
- * doSomething();
- * final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders(), acker);
- * channelProcessor.processEvent(profileEvent);
- * }
- *
  * The ACKer will be used to <b>ACK</b> upstream after that the downstream <b>ACKed</b> sort-standalone.
  * This process seems like <b>transaction</b> of the whole sort-standalone, and which
  * ensure <b>At Least One</b> semantics.
@@ -103,14 +98,10 @@ public class FetchCallback implements ReadCallback {
     public void onFinished(final MessageRecord messageRecord) {
         try {
             Preconditions.checkState(messageRecord != null, "Fetched msg is null.");
-            CacheMessageRecord cacheRecord = new CacheMessageRecord(messageRecord, client);
+            CacheMessageRecord cacheRecord = new CacheMessageRecord(messageRecord, client,
+                    CommonPropertiesHolder.getAckPolicy());
             for (InLongMessage inLongMessage : messageRecord.getMsgs()) {
-                final SubscribeFetchResult result = SubscribeFetchResult.Factory
-                        .create(sortTaskName, messageRecord.getMsgKey(), messageRecord.getOffset(),
-                                inLongMessage.getParams(), messageRecord.getRecTime(),
-                                inLongMessage.getBody());
-                final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders(), 
-                        cacheRecord);
+                final ProfileEvent profileEvent = new ProfileEvent(inLongMessage, cacheRecord);
                 channelProcessor.processEvent(profileEvent);
                 context.reportToMetric(profileEvent, sortTaskName, "-", SortSdkSourceContext.FetchResult.SUCCESS);
             }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
index 9e8a69792..278fe10e0 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.standalone.sink.cls;
 
 import com.tencentcloudapi.cls.producer.common.LogItem;
+
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.junit.Assert;
@@ -85,7 +86,7 @@ public class TestDefaultEvent2LogItemHandler {
         headers.put(Constants.INLONG_GROUP_ID, "testGroup");
         headers.put(Constants.INLONG_STREAM_ID, "testStream");
         headers.put(Constants.HEADER_KEY_MSG_TIME, "1234456");
-        return new ProfileEvent(body, headers, null);
+        return new ProfileEvent(headers, body);
     }
 
 }
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
index 732f78484..7e76dd9f0 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
@@ -86,7 +86,7 @@ public class TestEsSinkContext {
         headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(System.currentTimeMillis()));
         headers.put(Constants.HEADER_KEY_SOURCE_IP, "127.0.0.1");
         byte[] body = content.getBytes(Charset.defaultCharset());
-        return new ProfileEvent(body, headers, null);
+        return new ProfileEvent(headers, body);
     }
 
     /**