You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/07 09:22:53 UTC

[incubator-inlong] branch master updated: [INLONG-2651][Sort] Add CLS sink and UT (#2919)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new dc7be87  [INLONG-2651][Sort] Add CLS sink and UT (#2919)
dc7be87 is described below

commit dc7be87629b6b0a96e4262d4fc2d5629e3d30e0a
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Mon Mar 7 17:22:00 2022 +0800

    [INLONG-2651][Sort] Add CLS sink and UT (#2919)
---
 .../sort/standalone/sink/cls/ClsCallback.java      |  82 ++++++++++
 .../sort/standalone/sink/cls/ClsChannelWorker.java | 168 +++++++++++++++++++++
 .../sort/standalone/sink/cls/ClsIdConfig.java      |   4 +-
 .../inlong/sort/standalone/sink/cls/ClsSink.java   |  98 ++++++++++++
 .../sort/standalone/sink/cls/ClsSinkContext.java   |  15 +-
 .../sink/kafka/KafkaFederationWorker.java          |   2 +
 .../sort/standalone/sink/cls/TestClsIdConfig.java  |  40 +++++
 .../sink/cls/TestDefaultEvent2LogItemHandler.java  |  91 +++++++++++
 8 files changed, 494 insertions(+), 6 deletions(-)

diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsCallback.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsCallback.java
new file mode 100644
index 0000000..9f900be
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsCallback.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.cls;
+
+import com.tencentcloudapi.cls.producer.Callback;
+import com.tencentcloudapi.cls.producer.Result;
+import org.apache.flume.Transaction;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * Implementation of CLS {@link Callback}.
+ */
+public class ClsCallback implements Callback {
+
+    private static final Logger LOG = InlongLoggerFactory.getLogger(ClsCallback.class);
+
+    private final Transaction tx;
+    private final ClsSinkContext context;
+    private final ProfileEvent event;
+    private final String topicId;
+
+    /**
+     * Constructor.
+     *
+     * @param tx Transaction
+     * @param context Context.
+     * @param event Related event.
+     */
+    public ClsCallback(Transaction tx, ClsSinkContext context, ProfileEvent event) {
+        this.tx = tx;
+        this.context = context;
+        this.event = event;
+        this.topicId = event.getHeaders().get(ClsSinkContext.KEY_TOPIC_ID);
+    }
+
+    @Override
+    public void onCompletion(Result result) {
+        if (!result.isSuccessful()) {
+            onFailed(result);
+            return;
+        }
+        onSuccess();
+    }
+
+    /**
+     * If send success.
+     */
+    private void onSuccess() {
+        context.addSendResultMetric(event, topicId, true, System.currentTimeMillis());
+        tx.commit();
+        tx.close();
+    }
+
+    /**
+     * If send failed.
+     *
+     * @param result Send result.
+     */
+    private void onFailed(Result result) {
+        tx.rollback();
+        tx.close();
+        LOG.error(result.toString());
+        context.addSendResultMetric(event, topicId, false, System.currentTimeMillis());
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
new file mode 100644
index 0000000..263d83f
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.cls;
+
+import com.google.common.base.Preconditions;
+import com.tencentcloudapi.cls.producer.AsyncProducerClient;
+import com.tencentcloudapi.cls.producer.common.LogItem;
+import com.tencentcloudapi.cls.producer.errors.ProducerException;
+import org.apache.flume.Channel;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Cls channel worker.
+ */
+public class ClsChannelWorker extends Thread {
+    private static final Logger LOG = InlongLoggerFactory.getLogger(ClsChannelWorker.class);
+
+    private final ClsSinkContext context;
+    private final String workerName;
+    private final Channel channel;
+    private final IEvent2LogItemHandler handler;
+    private LifecycleState status;
+
+    /**
+     * Constructor.
+     *
+     * @param sinkName Sink name.
+     * @param context Cls context.
+     * @param workerIndex Index of cls channel worker.
+     */
+    public ClsChannelWorker(String sinkName, ClsSinkContext context, int workerIndex) {
+        this.context = Preconditions.checkNotNull(context);
+        this.workerName = sinkName + "-" + workerIndex;
+        this.channel = Preconditions.checkNotNull(context.getChannel());
+        this.handler = Preconditions.checkNotNull(context.getLogItemHandler());
+        this.status = LifecycleState.IDLE;
+    }
+
+    @Override
+    public void start() {
+        LOG.info("Start new cls channel worker {}", this.workerName);
+        status = LifecycleState.START;
+        super.start();
+    }
+
+    /**
+     * Close cls channel worker.
+     */
+    public void close() {
+        LOG.info("Close cls channel worker {}", this.workerName);
+        status = LifecycleState.STOP;
+    }
+
+    /**
+     * Run until status is STOP.
+     */
+    @Override
+    public void run() {
+        LOG.info("worker {} start to run, the state is {}", this.workerName, status.name());
+        while (status != LifecycleState.STOP) {
+            doRun();
+        }
+    }
+
+    /**
+     * Do run.
+     */
+    private void doRun() {
+        Transaction tx = null;
+        try {
+            tx = channel.getTransaction();
+            tx.begin();
+            Event rowEvent = channel.take();
+
+            // if event is null, close tx and sleep for a while.
+            if (rowEvent == null) {
+                this.commitTransaction(tx);
+                sleepOneInterval();
+                return;
+            }
+            // if is the instanceof ProfileEvent
+            if (!(rowEvent instanceof ProfileEvent)) {
+                this.commitTransaction(tx);
+                LOG.error("The type of row event is not compatible with ProfileEvent");
+                return;
+            }
+            // do send
+            this.send(rowEvent, tx);
+
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            this.rollbackTransaction(tx);
+            sleepOneInterval();
+        }
+    }
+
+    /**
+     * Send event to Cls
+     *
+     * @param rowEvent Row event.
+     * @param tx Transaction
+     * @throws ProducerException
+     * @throws InterruptedException
+     */
+    private void send(Event rowEvent, Transaction tx) throws ProducerException, InterruptedException {
+        ProfileEvent event = (ProfileEvent) rowEvent;
+        ClsIdConfig idConfig = context.getIdConfig(event.getUid());
+        event.getHeaders().put(ClsSinkContext.KEY_TOPIC_ID, idConfig.getTopicId());
+        AsyncProducerClient client = context.getClient(idConfig.getSecretId());
+        List<LogItem> record = handler.parse(context, event);
+        ClsCallback callback = new ClsCallback(tx, context, event);
+        client.putLogs(idConfig.getTopicId(), record, callback);
+    }
+
+    /** sleepOneInterval */
+    private void sleepOneInterval() {
+        try {
+            Thread.sleep(context.getProcessInterval());
+        } catch (InterruptedException e1) {
+            LOG.error(e1.getMessage(), e1);
+        }
+    }
+
+    /**
+     * Rollback transaction if it exists.
+     * @param tx Transaction
+     */
+    private void rollbackTransaction(Transaction tx) {
+        if (tx != null) {
+            tx.rollback();
+            tx.close();
+        }
+    }
+
+    /**
+     * Commit transaction if it exists.
+     * @param tx Transaction
+     */
+    private void commitTransaction(Transaction tx) {
+        if (tx != null) {
+            tx.commit();
+            tx.close();
+        }
+    }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
index abcae88..04f67c3 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.sort.standalone.sink.cls;
 
-import lombok.Getter;
+import lombok.Data;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -26,7 +26,7 @@ import java.util.List;
 /**
  * Cls config of each uid.
  */
-@Getter
+@Data
 public class ClsIdConfig {
     private String inlongGroupId;
     private String inlongStreamId;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java
new file mode 100644
index 0000000..cc1e976
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.cls;
+
+import org.apache.flume.Context;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Cls Sink implementation.
+ *
+ * <p>
+ *     Response for initialization of {@link ClsChannelWorker}.
+ * </p>
+ */
+public class ClsSink extends AbstractSink implements Configurable {
+    private static final Logger LOG = LoggerFactory.getLogger(ClsSink.class);
+
+    private Context parentContext;
+    private ClsSinkContext context;
+    private List<ClsChannelWorker> workers;
+
+    /**
+     * Start {@link ClsChannelWorker}.
+     */
+    @Override
+    public void start() {
+        super.start();
+        try {
+            this.context = new ClsSinkContext(getName(), parentContext, getChannel());
+            this.context.start();
+            for (int i = 0; i < context.getMaxThreads(); i++) {
+                ClsChannelWorker worker = new ClsChannelWorker(getName(), context, i);
+                this.workers.add(worker);
+                worker.start();
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Stop {@link ClsChannelWorker}.
+     */
+    @Override
+    public void stop() {
+        super.stop();
+        try {
+            this.context.close();
+            for (ClsChannelWorker worker : this.workers) {
+                worker.close();
+            }
+            this.workers.clear();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Process.
+     * @return Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        return Status.BACKOFF;
+    }
+
+    /**
+     * Config parent context.
+     * @param context Parent context.
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", this.getName(), context.toString());
+        this.parentContext = context;
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index f068100..9872ab9 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -44,7 +44,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 /**
@@ -66,6 +65,7 @@ public class ClsSinkContext extends SinkContext {
     private static final String KEY_MAX_RETRY_BACKOFF_MS = "maxRetryBackoffMs";
     private static final String KEY_MAX_KEYWORD_LENGTH = "maxKeywordLength";
     private static final String KEY_EVENT_LOG_ITEM_HANDLER = "logItemHandler";
+    public static final String KEY_TOPIC_ID = "topicId";
 
     private static final int DEFAULT_KEYWORD_MAX_LENGTH = 32 * 1024 - 1;
     private int keywordMaxLength = DEFAULT_KEYWORD_MAX_LENGTH;
@@ -74,9 +74,6 @@ public class ClsSinkContext extends SinkContext {
     private List<AsyncProducerClient> deletingClients;
     private Context sinkContext;
     private Map<String, ClsIdConfig> idConfigMap = new ConcurrentHashMap<>();
-    private AtomicLong offerCounter = new AtomicLong(0);
-    private AtomicLong takeCounter = new AtomicLong(0);
-    private AtomicLong backCounter = new AtomicLong(0);
     private IEvent2LogItemHandler event2LogItemHandler;
 
     /**
@@ -332,4 +329,14 @@ public class ClsSinkContext extends SinkContext {
     public IEvent2LogItemHandler getLogItemHandler() {
         return event2LogItemHandler;
     }
+
+    /**
+     * Get cls client.
+     *
+     * @param secretId ID of client.
+     * @return Client instance.
+     */
+    public AsyncProducerClient getClient(String secretId) {
+        return clientMap.get(secretId);
+    }
 }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
index 14d76b7..7ce0cf3 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
@@ -105,6 +105,8 @@ public class KafkaFederationWorker extends Thread {
                     continue;
                 }
                 if (!(rowEvent instanceof ProfileEvent)) {
+                    tx.commit();
+                    tx.close();
                     LOG.error("The type of row event is not compatible with ProfileEvent");
                     continue;
                 }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java
new file mode 100644
index 0000000..7e9cc8d
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.cls;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+public class TestClsIdConfig {
+
+    @Test
+    public void testGetFieldList() {
+        ClsIdConfig idConfig = new ClsIdConfig();
+        String testFieldName = "1 2 3 4 5 6 7";
+        idConfig.setFieldNames(testFieldName);
+        List<String> fieldList = idConfig.getFieldList();
+        Assert.assertEquals(7, fieldList.size());
+    }
+}
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
new file mode 100644
index 0000000..211c8f0
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.cls;
+
+import com.tencentcloudapi.cls.producer.common.LogItem;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.utils.Constants;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+@PrepareForTest({ClsSinkContext.class, LogItem.class})
+public class TestDefaultEvent2LogItemHandler {
+
+    private ClsIdConfig idConfig;
+    private ProfileEvent event;
+    private DefaultEvent2LogItemHandler handler;
+    private ClsSinkContext mockContext;
+
+    @Before
+    public void setUp() {
+        idConfig = prepareIdConfig();
+        event = prepareEvent();
+        mockContext = PowerMockito.mock(ClsSinkContext.class);
+        handler = new DefaultEvent2LogItemHandler();
+    }
+
+    @Test
+    public void testNoIdConfig() {
+        Assert.assertNull(handler.parse(mockContext, event));
+    }
+
+    // @Test
+    public void testNormal() {
+        PowerMockito.when(mockContext.getIdConfig(Mockito.anyString())).thenReturn(idConfig);
+        PowerMockito.when(mockContext.getKeywordMaxLength()).thenReturn(8 * 1024);
+        List<LogItem> itemList = handler.parse(mockContext, event);
+        System.out.println(itemList.size());
+    }
+
+    private ClsIdConfig prepareIdConfig() {
+        ClsIdConfig config = new ClsIdConfig();
+        config.setFieldNames("f1 f2 f3 f4 f5 f6 f7 f8");
+        config.setInlongGroupId("testGroup");
+        config.setInlongStreamId("testStream");
+        config.setSecretId("testSecretId");
+        config.setSecretKey("testSecretKey");
+        config.setEndpoint("testEndPoint");
+        config.setTopicId("testTopicId");
+        return config;
+    }
+
+    private ProfileEvent prepareEvent() {
+        String str = "v1|v2|v3|v4|v5|v6|v7|v8";
+        final byte[] body = str.getBytes(StandardCharsets.UTF_8);
+        Map<String, String> headers = new HashMap<>();
+        headers.put(Constants.INLONG_GROUP_ID, "testGroup");
+        headers.put(Constants.INLONG_STREAM_ID, "testStream");
+        headers.put(Constants.HEADER_KEY_MSG_TIME, "1234456");
+        return new ProfileEvent(body, headers);
+    }
+
+}
\ No newline at end of file