You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/07 09:22:53 UTC
[incubator-inlong] branch master updated: [INLONG-2651][Sort] Add CLS sink and UT (#2919)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new dc7be87 [INLONG-2651][Sort] Add CLS sink and UT (#2919)
dc7be87 is described below
commit dc7be87629b6b0a96e4262d4fc2d5629e3d30e0a
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Mon Mar 7 17:22:00 2022 +0800
[INLONG-2651][Sort] Add CLS sink and UT (#2919)
---
.../sort/standalone/sink/cls/ClsCallback.java | 82 ++++++++++
.../sort/standalone/sink/cls/ClsChannelWorker.java | 168 +++++++++++++++++++++
.../sort/standalone/sink/cls/ClsIdConfig.java | 4 +-
.../inlong/sort/standalone/sink/cls/ClsSink.java | 98 ++++++++++++
.../sort/standalone/sink/cls/ClsSinkContext.java | 15 +-
.../sink/kafka/KafkaFederationWorker.java | 2 +
.../sort/standalone/sink/cls/TestClsIdConfig.java | 40 +++++
.../sink/cls/TestDefaultEvent2LogItemHandler.java | 91 +++++++++++
8 files changed, 494 insertions(+), 6 deletions(-)
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsCallback.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsCallback.java
new file mode 100644
index 0000000..9f900be
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsCallback.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.cls;
+
+import com.tencentcloudapi.cls.producer.Callback;
+import com.tencentcloudapi.cls.producer.Result;
+import org.apache.flume.Transaction;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * Implementation of CLS {@link Callback}.
+ */
+public class ClsCallback implements Callback {
+
+ private static final Logger LOG = InlongLoggerFactory.getLogger(ClsCallback.class);
+
+ private final Transaction tx;
+ private final ClsSinkContext context;
+ private final ProfileEvent event;
+ private final String topicId;
+
+ /**
+ * Constructor.
+ *
+ * @param tx Transaction
+ * @param context Context.
+ * @param event Related event.
+ */
+ public ClsCallback(Transaction tx, ClsSinkContext context, ProfileEvent event) {
+ this.tx = tx;
+ this.context = context;
+ this.event = event;
+ this.topicId = event.getHeaders().get(ClsSinkContext.KEY_TOPIC_ID);
+ }
+
+ @Override
+ public void onCompletion(Result result) {
+ if (!result.isSuccessful()) {
+ onFailed(result);
+ return;
+ }
+ onSuccess();
+ }
+
+ /**
+ * If send success.
+ */
+ private void onSuccess() {
+ context.addSendResultMetric(event, topicId, true, System.currentTimeMillis());
+ tx.commit();
+ tx.close();
+ }
+
+ /**
+ * If send failed.
+ *
+ * @param result Send result.
+ */
+ private void onFailed(Result result) {
+ tx.rollback();
+ tx.close();
+ LOG.error(result.toString());
+ context.addSendResultMetric(event, topicId, false, System.currentTimeMillis());
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
new file mode 100644
index 0000000..263d83f
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.cls;
+
+import com.google.common.base.Preconditions;
+import com.tencentcloudapi.cls.producer.AsyncProducerClient;
+import com.tencentcloudapi.cls.producer.common.LogItem;
+import com.tencentcloudapi.cls.producer.errors.ProducerException;
+import org.apache.flume.Channel;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Cls channel worker.
+ */
+public class ClsChannelWorker extends Thread {
+ private static final Logger LOG = InlongLoggerFactory.getLogger(ClsChannelWorker.class);
+
+ private final ClsSinkContext context;
+ private final String workerName;
+ private final Channel channel;
+ private final IEvent2LogItemHandler handler;
+ private LifecycleState status;
+
+ /**
+ * Constructor.
+ *
+ * @param sinkName Sink name.
+ * @param context Cls context.
+ * @param workerIndex Index of cls channel worker.
+ */
+ public ClsChannelWorker(String sinkName, ClsSinkContext context, int workerIndex) {
+ this.context = Preconditions.checkNotNull(context);
+ this.workerName = sinkName + "-" + workerIndex;
+ this.channel = Preconditions.checkNotNull(context.getChannel());
+ this.handler = Preconditions.checkNotNull(context.getLogItemHandler());
+ this.status = LifecycleState.IDLE;
+ }
+
+ @Override
+ public void start() {
+ LOG.info("Start new cls channel worker {}", this.workerName);
+ status = LifecycleState.START;
+ super.start();
+ }
+
+ /**
+ * Close cls channel worker.
+ */
+ public void close() {
+ LOG.info("Close cls channel worker {}", this.workerName);
+ status = LifecycleState.STOP;
+ }
+
+ /**
+ * Run until status is STOP.
+ */
+ @Override
+ public void run() {
+ LOG.info("worker {} start to run, the state is {}", this.workerName, status.name());
+ while (status != LifecycleState.STOP) {
+ doRun();
+ }
+ }
+
+ /**
+ * Do run.
+ */
+ private void doRun() {
+ Transaction tx = null;
+ try {
+ tx = channel.getTransaction();
+ tx.begin();
+ Event rowEvent = channel.take();
+
+ // if event is null, close tx and sleep for a while.
+ if (rowEvent == null) {
+ this.commitTransaction(tx);
+ sleepOneInterval();
+ return;
+ }
+ // if is the instanceof ProfileEvent
+ if (!(rowEvent instanceof ProfileEvent)) {
+ this.commitTransaction(tx);
+ LOG.error("The type of row event is not compatible with ProfileEvent");
+ return;
+ }
+ // do send
+ this.send(rowEvent, tx);
+
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ this.rollbackTransaction(tx);
+ sleepOneInterval();
+ }
+ }
+
+ /**
+ * Send event to Cls
+ *
+ * @param rowEvent Row event.
+ * @param tx Transaction
+ * @throws ProducerException
+ * @throws InterruptedException
+ */
+ private void send(Event rowEvent, Transaction tx) throws ProducerException, InterruptedException {
+ ProfileEvent event = (ProfileEvent) rowEvent;
+ ClsIdConfig idConfig = context.getIdConfig(event.getUid());
+ event.getHeaders().put(ClsSinkContext.KEY_TOPIC_ID, idConfig.getTopicId());
+ AsyncProducerClient client = context.getClient(idConfig.getSecretId());
+ List<LogItem> record = handler.parse(context, event);
+ ClsCallback callback = new ClsCallback(tx, context, event);
+ client.putLogs(idConfig.getTopicId(), record, callback);
+ }
+
+ /** sleepOneInterval */
+ private void sleepOneInterval() {
+ try {
+ Thread.sleep(context.getProcessInterval());
+ } catch (InterruptedException e1) {
+ LOG.error(e1.getMessage(), e1);
+ }
+ }
+
+ /**
+ * Rollback transaction if it exists.
+ * @param tx Transaction
+ */
+ private void rollbackTransaction(Transaction tx) {
+ if (tx != null) {
+ tx.rollback();
+ tx.close();
+ }
+ }
+
+ /**
+ * Commit transaction if it exists.
+ * @param tx Transaction
+ */
+ private void commitTransaction(Transaction tx) {
+ if (tx != null) {
+ tx.commit();
+ tx.close();
+ }
+ }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
index abcae88..04f67c3 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
@@ -17,7 +17,7 @@
package org.apache.inlong.sort.standalone.sink.cls;
-import lombok.Getter;
+import lombok.Data;
import java.util.ArrayList;
import java.util.Arrays;
@@ -26,7 +26,7 @@ import java.util.List;
/**
* Cls config of each uid.
*/
-@Getter
+@Data
public class ClsIdConfig {
private String inlongGroupId;
private String inlongStreamId;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java
new file mode 100644
index 0000000..cc1e976
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.cls;
+
+import org.apache.flume.Context;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Cls Sink implementation.
+ *
+ * <p>
+ * Response for initialization of {@link ClsChannelWorker}.
+ * </p>
+ */
+public class ClsSink extends AbstractSink implements Configurable {
+ private static final Logger LOG = LoggerFactory.getLogger(ClsSink.class);
+
+ private Context parentContext;
+ private ClsSinkContext context;
+ private List<ClsChannelWorker> workers;
+
+ /**
+ * Start {@link ClsChannelWorker}.
+ */
+ @Override
+ public void start() {
+ super.start();
+ try {
+ this.context = new ClsSinkContext(getName(), parentContext, getChannel());
+ this.context.start();
+ for (int i = 0; i < context.getMaxThreads(); i++) {
+ ClsChannelWorker worker = new ClsChannelWorker(getName(), context, i);
+ this.workers.add(worker);
+ worker.start();
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Stop {@link ClsChannelWorker}.
+ */
+ @Override
+ public void stop() {
+ super.stop();
+ try {
+ this.context.close();
+ for (ClsChannelWorker worker : this.workers) {
+ worker.close();
+ }
+ this.workers.clear();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Process.
+ * @return Status
+ * @throws EventDeliveryException
+ */
+ @Override
+ public Status process() throws EventDeliveryException {
+ return Status.BACKOFF;
+ }
+
+ /**
+ * Config parent context.
+ * @param context Parent context.
+ */
+ @Override
+ public void configure(Context context) {
+ LOG.info("start to configure:{}, context:{}.", this.getName(), context.toString());
+ this.parentContext = context;
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index f068100..9872ab9 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -44,7 +44,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
@@ -66,6 +65,7 @@ public class ClsSinkContext extends SinkContext {
private static final String KEY_MAX_RETRY_BACKOFF_MS = "maxRetryBackoffMs";
private static final String KEY_MAX_KEYWORD_LENGTH = "maxKeywordLength";
private static final String KEY_EVENT_LOG_ITEM_HANDLER = "logItemHandler";
+ public static final String KEY_TOPIC_ID = "topicId";
private static final int DEFAULT_KEYWORD_MAX_LENGTH = 32 * 1024 - 1;
private int keywordMaxLength = DEFAULT_KEYWORD_MAX_LENGTH;
@@ -74,9 +74,6 @@ public class ClsSinkContext extends SinkContext {
private List<AsyncProducerClient> deletingClients;
private Context sinkContext;
private Map<String, ClsIdConfig> idConfigMap = new ConcurrentHashMap<>();
- private AtomicLong offerCounter = new AtomicLong(0);
- private AtomicLong takeCounter = new AtomicLong(0);
- private AtomicLong backCounter = new AtomicLong(0);
private IEvent2LogItemHandler event2LogItemHandler;
/**
@@ -332,4 +329,14 @@ public class ClsSinkContext extends SinkContext {
public IEvent2LogItemHandler getLogItemHandler() {
return event2LogItemHandler;
}
+
+ /**
+ * Get cls client.
+ *
+ * @param secretId ID of client.
+ * @return Client instance.
+ */
+ public AsyncProducerClient getClient(String secretId) {
+ return clientMap.get(secretId);
+ }
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
index 14d76b7..7ce0cf3 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
@@ -105,6 +105,8 @@ public class KafkaFederationWorker extends Thread {
continue;
}
if (!(rowEvent instanceof ProfileEvent)) {
+ tx.commit();
+ tx.close();
LOG.error("The type of row event is not compatible with ProfileEvent");
continue;
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java
new file mode 100644
index 0000000..7e9cc8d
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.cls;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+public class TestClsIdConfig {
+
+ @Test
+ public void testGetFieldList() {
+ ClsIdConfig idConfig = new ClsIdConfig();
+ String testFieldName = "1 2 3 4 5 6 7";
+ idConfig.setFieldNames(testFieldName);
+ List<String> fieldList = idConfig.getFieldList();
+ Assert.assertEquals(7, fieldList.size());
+ }
+}
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
new file mode 100644
index 0000000..211c8f0
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.cls;
+
+import com.tencentcloudapi.cls.producer.common.LogItem;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.utils.Constants;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+@PrepareForTest({ClsSinkContext.class, LogItem.class})
+public class TestDefaultEvent2LogItemHandler {
+
+ private ClsIdConfig idConfig;
+ private ProfileEvent event;
+ private DefaultEvent2LogItemHandler handler;
+ private ClsSinkContext mockContext;
+
+ @Before
+ public void setUp() {
+ idConfig = prepareIdConfig();
+ event = prepareEvent();
+ mockContext = PowerMockito.mock(ClsSinkContext.class);
+ handler = new DefaultEvent2LogItemHandler();
+ }
+
+ @Test
+ public void testNoIdConfig() {
+ Assert.assertNull(handler.parse(mockContext, event));
+ }
+
+ // @Test
+ public void testNormal() {
+ PowerMockito.when(mockContext.getIdConfig(Mockito.anyString())).thenReturn(idConfig);
+ PowerMockito.when(mockContext.getKeywordMaxLength()).thenReturn(8 * 1024);
+ List<LogItem> itemList = handler.parse(mockContext, event);
+ System.out.println(itemList.size());
+ }
+
+ private ClsIdConfig prepareIdConfig() {
+ ClsIdConfig config = new ClsIdConfig();
+ config.setFieldNames("f1 f2 f3 f4 f5 f6 f7 f8");
+ config.setInlongGroupId("testGroup");
+ config.setInlongStreamId("testStream");
+ config.setSecretId("testSecretId");
+ config.setSecretKey("testSecretKey");
+ config.setEndpoint("testEndPoint");
+ config.setTopicId("testTopicId");
+ return config;
+ }
+
+ private ProfileEvent prepareEvent() {
+ String str = "v1|v2|v3|v4|v5|v6|v7|v8";
+ final byte[] body = str.getBytes(StandardCharsets.UTF_8);
+ Map<String, String> headers = new HashMap<>();
+ headers.put(Constants.INLONG_GROUP_ID, "testGroup");
+ headers.put(Constants.INLONG_STREAM_ID, "testStream");
+ headers.put(Constants.HEADER_KEY_MSG_TIME, "1234456");
+ return new ProfileEvent(body, headers);
+ }
+
+}
\ No newline at end of file