You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/30 02:28:27 UTC
[inlong] branch master updated: [INLONG-4777][SortStandalone] Change ack policy from checking message count to checking every message (#4778)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f8c46a657 [INLONG-4777][SortStandalone] Change ack policy from checking message count to checking every message (#4778)
f8c46a657 is described below
commit f8c46a6575c5b4506235e02e5868e6786248ee53
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Jun 30 10:28:22 2022 +0800
[INLONG-4777][SortStandalone] Change ack policy from checking message count to checking every message (#4778)
---
.../sort/standalone/config/holder/AckPolicy.java | 77 ++++++++++++++++++++++
.../config/holder/CommonPropertiesHolder.java | 21 ++++--
.../standalone/channel/BufferQueueChannel.java | 2 +-
.../standalone/channel/CacheMessageRecord.java | 55 +++++++++++++++-
.../sort/standalone/channel/ProfileEvent.java | 45 ++++++++++---
.../sort/standalone/sink/cls/ClsSinkContext.java | 22 +++----
.../sink/elasticsearch/EsSinkContext.java | 22 +++----
.../standalone/source/sortsdk/FetchCallback.java | 29 +++-----
.../sink/cls/TestDefaultEvent2LogItemHandler.java | 3 +-
.../sink/elasticsearch/TestEsSinkContext.java | 2 +-
10 files changed, 217 insertions(+), 61 deletions(-)
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/AckPolicy.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/AckPolicy.java
new file mode 100644
index 000000000..6dc0127f8
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/AckPolicy.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.holder;
+
+/**
+ * AckPolicy
+ *
+ */
+public enum AckPolicy {
+
+ COUNT(0), TOKEN(1);
+
+ private final int value;
+
+ /**
+ * Constructor
+ * @param value
+ */
+ private AckPolicy(int value) {
+ this.value = value;
+ }
+
+ /**
+ * getValue
+ * @return int
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * getAckPolicy
+ * @param value
+ * @return AckPolicy
+ */
+ public static AckPolicy getAckPolicy(int value) {
+ switch (value) {
+ case 0 :
+ return COUNT;
+ case 1 :
+ return TOKEN;
+ default :
+ return COUNT;
+ }
+ }
+
+ /**
+ * getAckPolicy
+ * @param name
+ * @return AckPolicy
+ */
+ public static AckPolicy getAckPolicy(String name) {
+ if (AckPolicy.COUNT.name().equalsIgnoreCase(name)) {
+ return AckPolicy.COUNT;
+ } else if (AckPolicy.TOKEN.name().equalsIgnoreCase(name)) {
+ return AckPolicy.TOKEN;
+ } else {
+ return AckPolicy.COUNT;
+ }
+ }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
index 05100fe68..129ce24f0 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
@@ -17,9 +17,6 @@
package org.apache.inlong.sort.standalone.config.holder;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Context;
@@ -28,6 +25,9 @@ import org.apache.inlong.sort.standalone.config.loader.CommonPropertiesLoader;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
*
* CommonPropertiesHolder
@@ -38,11 +38,13 @@ public class CommonPropertiesHolder {
public static final String DEFAULT_LOADER = ClassResourceCommonPropertiesLoader.class.getName();
public static final String KEY_COMMON_PROPERTIES = "common_properties_loader";
public static final String KEY_CLUSTER_ID = "clusterId";
+ public static final String KEY_SORT_SOURCE_ACKPOLICY = "sortSource.ackPolicy";
private static Map<String, String> props;
private static Context context;
private static long auditFormatInterval = 60000L;
+ private static AckPolicy ackPolicy;
/**
* init
@@ -60,8 +62,11 @@ public class CommonPropertiesHolder {
CommonPropertiesLoader loader = (CommonPropertiesLoader) loaderObject;
props.putAll(loader.load());
LOG.info("loaderClass:{},properties:{}", loaderClassName, props);
- auditFormatInterval = NumberUtils
+ CommonPropertiesHolder.auditFormatInterval = NumberUtils
.toLong(CommonPropertiesHolder.getString("auditFormatInterval"), 60000L);
+ String strAckPolicy = CommonPropertiesHolder.getString(KEY_SORT_SOURCE_ACKPOLICY,
+ AckPolicy.COUNT.name());
+ CommonPropertiesHolder.ackPolicy = AckPolicy.getAckPolicy(strAckPolicy);
}
} catch (Throwable t) {
LOG.error("Fail to init CommonPropertiesLoader,loaderClass:{},error:{}",
@@ -205,4 +210,12 @@ public class CommonPropertiesHolder {
return auditFormatInterval;
}
+ /**
+ * get ackPolicy
+ * @return the ackPolicy
+ */
+ public static AckPolicy getAckPolicy() {
+ return ackPolicy;
+ }
+
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
index 52f2643d3..57ad61a81 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
@@ -81,7 +81,7 @@ public class BufferQueueChannel extends AbstractChannel {
ProfileEvent profile = (ProfileEvent) event;
transaction.doPut(profile);
} else {
- ProfileEvent profile = new ProfileEvent(event.getBody(), event.getHeaders(), null);
+ ProfileEvent profile = new ProfileEvent(event.getHeaders(), event.getBody());
transaction.doPut(profile);
}
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java
index 63dd51750..bda5f246e 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java
@@ -19,9 +19,12 @@ package org.apache.inlong.sort.standalone.channel;
import org.apache.inlong.sdk.sort.api.SortClient;
import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sort.standalone.config.holder.AckPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -35,6 +38,8 @@ public class CacheMessageRecord {
private final String msgKey;
private final String offset;
private final AtomicInteger ackCount;
+ private final AckPolicy ackPolicy;
+ private Set<Integer> tokenSet;
/**
* Constructor
@@ -42,17 +47,47 @@ public class CacheMessageRecord {
* @param msgRecord
* @param client
*/
- public CacheMessageRecord(MessageRecord msgRecord, SortClient client) {
+ public CacheMessageRecord(MessageRecord msgRecord, SortClient client, AckPolicy ackPolicy) {
this.msgKey = msgRecord.getMsgKey();
this.offset = msgRecord.getOffset();
this.ackCount = new AtomicInteger(msgRecord.getMsgs().size());
this.client = client;
+ this.ackPolicy = ackPolicy;
+ if (AckPolicy.TOKEN.equals(ackPolicy)) {
+ this.tokenSet = new HashSet<>();
+ for (int i = 0; i < msgRecord.getMsgs().size(); i++) {
+ this.tokenSet.add(i);
+ }
+ }
+ }
+
+ /**
+ * getToken
+ * @return
+ */
+ public Integer getToken() {
+ if (AckPolicy.TOKEN.equals(ackPolicy)) {
+ return this.ackCount.decrementAndGet();
+ }
+ return 0;
}
/**
* ackMessage
+ * @param ackToken ackToken
+ */
+ public void ackMessage(int ackToken) {
+ if (AckPolicy.TOKEN.equals(ackPolicy)) {
+ this.ackMessageByToken(ackToken);
+ return;
+ }
+ this.ackMessageByCount();
+ }
+
+ /**
+ * ackMessageByCount
*/
- public void ackMessage() {
+ private void ackMessageByCount() {
int result = this.ackCount.decrementAndGet();
if (result == 0 && client != null) {
try {
@@ -62,4 +97,20 @@ public class CacheMessageRecord {
}
}
}
+
+ /**
+ * ackMessageByToken
+ * @param ackToken ackToken
+ */
+ private void ackMessageByToken(int ackToken) {
+ this.tokenSet.remove(ackToken);
+ int result = this.tokenSet.size();
+ if (result == 0 && client != null) {
+ try {
+ client.ack(msgKey, offset);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
index 27f7962bf..2dfc3dd73 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.standalone.channel;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.event.SimpleEvent;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.utils.Constants;
@@ -35,25 +36,45 @@ public class ProfileEvent extends SimpleEvent {
private final String uid;
private final long rawLogTime;
+ private final String sourceIp;
private final long fetchTime;
- private final CacheMessageRecord cacheRecord;
+ private CacheMessageRecord cacheRecord;
+ private final int ackToken;
/**
* Constructor
- *
- * @param body
* @param headers
- * @param cacheRecord
+ * @param body
*/
- public ProfileEvent(byte[] body, Map<String, String> headers, CacheMessageRecord cacheRecord) {
- super.setBody(body);
+ public ProfileEvent(Map<String, String> headers, byte[] body) {
super.setHeaders(headers);
- this.cacheRecord = cacheRecord;
+ super.setBody(body);
this.inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
this.inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
this.fetchTime = System.currentTimeMillis();
this.rawLogTime = NumberUtils.toLong(headers.get(Constants.HEADER_KEY_MSG_TIME), fetchTime);
+ this.sourceIp = headers.get(Constants.HEADER_KEY_SOURCE_IP);
+ this.ackToken = 0;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param sdkMessage
+ * @param cacheRecord
+ */
+ public ProfileEvent(InLongMessage sdkMessage, CacheMessageRecord cacheRecord) {
+ super.setHeaders(sdkMessage.getParams());
+ super.setBody(sdkMessage.getBody());
+ this.inlongGroupId = sdkMessage.getInlongGroupId();
+ this.inlongStreamId = sdkMessage.getInlongStreamId();
+ this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+ this.rawLogTime = sdkMessage.getMsgTime();
+ this.sourceIp = sdkMessage.getSourceIp();
+ this.cacheRecord = cacheRecord;
+ this.fetchTime = System.currentTimeMillis();
+ this.ackToken = cacheRecord.getToken();
}
/**
@@ -83,6 +104,14 @@ public class ProfileEvent extends SimpleEvent {
return rawLogTime;
}
+ /**
+ * get sourceIp
+ * @return the sourceIp
+ */
+ public String getSourceIp() {
+ return sourceIp;
+ }
+
/**
* get fetchTime
*
@@ -115,7 +144,7 @@ public class ProfileEvent extends SimpleEvent {
*/
public void ack() {
if (cacheRecord != null) {
- cacheRecord.ackMessage();
+ cacheRecord.ackMessage(ackToken);
}
}
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index 244a4a426..65ce2cb93 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -25,7 +25,6 @@ import com.tencentcloudapi.cls.producer.errors.ProducerException;
import com.tencentcloudapi.cls.producer.util.NetworkUtils;
import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
@@ -268,25 +267,22 @@ public class ClsSinkContext extends SinkContext {
public void addSendResultMetric(ProfileEvent currentRecord, String bid, boolean result, long sendTime) {
Map<String, String> dimensions = this.getDimensions(currentRecord, bid);
SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
- long count = 1;
- long size = currentRecord.getBody().length;
if (result) {
- metricItem.sendSuccessCount.addAndGet(count);
- metricItem.sendSuccessSize.addAndGet(size);
+ metricItem.sendSuccessCount.incrementAndGet();
+ metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
AuditUtils.add(AuditUtils.AUDIT_ID_SEND_SUCCESS, currentRecord);
if (sendTime > 0) {
- long currentTime = System.currentTimeMillis();
+ final long currentTime = System.currentTimeMillis();
long sinkDuration = currentTime - sendTime;
- long nodeDuration = currentTime
- - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, currentRecord.getRawLogTime());
+ long nodeDuration = currentTime - currentRecord.getFetchTime();
long wholeDuration = currentTime - currentRecord.getRawLogTime();
- metricItem.sinkDuration.addAndGet(sinkDuration * count);
- metricItem.nodeDuration.addAndGet(nodeDuration * count);
- metricItem.wholeDuration.addAndGet(wholeDuration * count);
+ metricItem.sinkDuration.addAndGet(sinkDuration);
+ metricItem.nodeDuration.addAndGet(nodeDuration);
+ metricItem.wholeDuration.addAndGet(wholeDuration);
}
} else {
- metricItem.sendFailCount.addAndGet(count);
- metricItem.sendFailSize.addAndGet(size);
+ metricItem.sendFailCount.incrementAndGet();
+ metricItem.sendFailSize.addAndGet(currentRecord.getBody().length);
}
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index 3f6abbd4a..6f22a6876 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -228,28 +228,26 @@ public class EsSinkContext extends SinkContext {
fillInlongId(currentRecord, dimensions);
dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, bid);
+ final long currentTime = System.currentTimeMillis();
long msgTime = currentRecord.getRawLogTime();
long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
- long count = 1;
- long size = currentRecord.getBody().length;
if (result) {
- metricItem.sendSuccessCount.addAndGet(count);
- metricItem.sendSuccessSize.addAndGet(size);
+ metricItem.sendSuccessCount.incrementAndGet();
+ metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
AuditUtils.add(AuditUtils.AUDIT_ID_SEND_SUCCESS, currentRecord);
if (sendTime > 0) {
- long currentTime = System.currentTimeMillis();
long sinkDuration = currentTime - sendTime;
- long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
- long wholeDuration = currentTime - msgTime;
- metricItem.sinkDuration.addAndGet(sinkDuration * count);
- metricItem.nodeDuration.addAndGet(nodeDuration * count);
- metricItem.wholeDuration.addAndGet(wholeDuration * count);
+ long nodeDuration = currentTime - currentRecord.getFetchTime();
+ long wholeDuration = currentTime - currentRecord.getRawLogTime();
+ metricItem.sinkDuration.addAndGet(sinkDuration);
+ metricItem.nodeDuration.addAndGet(nodeDuration);
+ metricItem.wholeDuration.addAndGet(wholeDuration);
}
} else {
- metricItem.sendFailCount.addAndGet(count);
- metricItem.sendFailSize.addAndGet(size);
+ metricItem.sendFailCount.incrementAndGet();
+ metricItem.sendFailSize.addAndGet(currentRecord.getBody().length);
}
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
index 458a22f4e..af09e1e9a 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
@@ -18,9 +18,7 @@
package org.apache.inlong.sort.standalone.source.sortsdk;
import com.google.common.base.Preconditions;
-import java.util.List;
-import javax.validation.constraints.NotBlank;
-import javax.validation.constraints.NotNull;
+
import org.apache.flume.channel.ChannelProcessor;
import org.apache.inlong.sdk.sort.api.ReadCallback;
import org.apache.inlong.sdk.sort.api.SortClient;
@@ -28,21 +26,18 @@ import org.apache.inlong.sdk.sort.entity.InLongMessage;
import org.apache.inlong.sdk.sort.entity.MessageRecord;
import org.apache.inlong.sort.standalone.channel.CacheMessageRecord;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
/**
* Implementation of {@link ReadCallback}.
*
- * TODO: Sort sdk should deliver one object which is held by {@link ProfileEvent} and used to ack upstream data store
- * The code should be like :
- *
- * public void onFinished(final MessageRecord messageRecord, ACKer acker) {
- * doSomething();
- * final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders(), acker);
- * channelProcessor.processEvent(profileEvent);
- * }
- *
* The ACKer will be used to <b>ACK</b> upstream after that the downstream <b>ACKed</b> sort-standalone.
* This process seems like <b>transaction</b> of the whole sort-standalone, and which
* ensure <b>At Least One</b> semantics.
@@ -103,14 +98,10 @@ public class FetchCallback implements ReadCallback {
public void onFinished(final MessageRecord messageRecord) {
try {
Preconditions.checkState(messageRecord != null, "Fetched msg is null.");
- CacheMessageRecord cacheRecord = new CacheMessageRecord(messageRecord, client);
+ CacheMessageRecord cacheRecord = new CacheMessageRecord(messageRecord, client,
+ CommonPropertiesHolder.getAckPolicy());
for (InLongMessage inLongMessage : messageRecord.getMsgs()) {
- final SubscribeFetchResult result = SubscribeFetchResult.Factory
- .create(sortTaskName, messageRecord.getMsgKey(), messageRecord.getOffset(),
- inLongMessage.getParams(), messageRecord.getRecTime(),
- inLongMessage.getBody());
- final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders(),
- cacheRecord);
+ final ProfileEvent profileEvent = new ProfileEvent(inLongMessage, cacheRecord);
channelProcessor.processEvent(profileEvent);
context.reportToMetric(profileEvent, sortTaskName, "-", SortSdkSourceContext.FetchResult.SUCCESS);
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
index 9e8a69792..278fe10e0 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.standalone.sink.cls;
import com.tencentcloudapi.cls.producer.common.LogItem;
+
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.junit.Assert;
@@ -85,7 +86,7 @@ public class TestDefaultEvent2LogItemHandler {
headers.put(Constants.INLONG_GROUP_ID, "testGroup");
headers.put(Constants.INLONG_STREAM_ID, "testStream");
headers.put(Constants.HEADER_KEY_MSG_TIME, "1234456");
- return new ProfileEvent(body, headers, null);
+ return new ProfileEvent(headers, body);
}
}
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
index 732f78484..7e76dd9f0 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
@@ -86,7 +86,7 @@ public class TestEsSinkContext {
headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(System.currentTimeMillis()));
headers.put(Constants.HEADER_KEY_SOURCE_IP, "127.0.0.1");
byte[] body = content.getBytes(Charset.defaultCharset());
- return new ProfileEvent(body, headers, null);
+ return new ProfileEvent(headers, body);
}
/**