You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/01/04 01:47:55 UTC

[GitHub] [incubator-inlong] baomingyu commented on a change in pull request #2091: [INLONG-2075] TDSDK Source of DataProxy support new Message format.

baomingyu commented on a change in pull request #2091:
URL: https://github.com/apache/incubator-inlong/pull/2091#discussion_r777783682



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
##########
@@ -408,49 +417,72 @@ private void formatMessagesAndSend(Map<String, String> commonAttrMap,
 
                     StringBuilder sidBuilder = new StringBuilder();
                     sidBuilder.append(topicEntry.getKey()).append(SEPARATOR).append(streamIdEntry.getKey())
-                        .append(SEPARATOR).append(sequenceId);
+                            .append(SEPARATOR).append(sequenceId);
                     headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString());
                 }
 
                 headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
-                Event event = EventBuilder.withBody(data, headers);
 
-                long dtten = 0;
-                try {
-                    dtten = Long.parseLong(headers.get(AttributeConstants.DATA_TIME));
-                } catch (Exception e1) {
-                    long uniqVal = Long.parseLong(commonAttrMap.get(AttributeConstants.UNIQ_ID));
-                    throw new MessageIDException(uniqVal,
-                        ErrorCode.DT_ERROR,
-                        new Throwable("attribute dt=" + headers.get(AttributeConstants.DATA_TIME
-                            + " has error, detail is: topic=" + topicEntry.getKey() + "&streamId="
-                            + streamIdEntry.getKey() + "&NodeIP=" + strRemoteIP), e1));
-                }
+                // process proxy message list
+                this.processProxyMessageList(headers, streamIdEntry.getValue());

Review comment:
        the message of tdmsg format is deleted?  the tdmsg format needs to be retained. And new message format need add tow new file for messagehandler and sink.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org