You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by se...@apache.org on 2021/12/08 06:25:51 UTC
[rocketmq-streams] branch main updated: fix(example) fix file source example
This is an automated email from the ASF dual-hosted git repository.
seraph pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 0f84c2d fix(example) fix file source example
new 2b9d15b Merge pull request #96 from ni-ze/main
0f84c2d is described below
commit 0f84c2d035bcdf9763d9eff78a535b6ea3027e71
Author: 维章 <un...@gmail.com>
AuthorDate: Wed Dec 8 11:41:02 2021 +0800
fix(example) fix file source example
---
.../common/batchsystem/BatchFinishMessage.java | 23 ++++++++++++++--------
.../examples/filesource/FileSourceExample.java | 2 +-
2 files changed, 16 insertions(+), 9 deletions(-)
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/batchsystem/BatchFinishMessage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/batchsystem/BatchFinishMessage.java
index aa8d0a1..a7a9f84 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/batchsystem/BatchFinishMessage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/batchsystem/BatchFinishMessage.java
@@ -22,21 +22,28 @@ import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
public class BatchFinishMessage implements ISystemMessage {
- public static String PRIMARY_KEY="__Primary_KEY";
+ public static String PRIMARY_KEY = "__Primary_KEY";
protected IMessage message;
- public BatchFinishMessage(IMessage message){
- this.message=message;
+
+ public BatchFinishMessage(IMessage message) {
+ this.message = message;
}
- public static IMessage create(){
- JSONObject msg=new JSONObject();
- msg.put(PRIMARY_KEY,true);
+
+ public static IMessage create() {
+ JSONObject msg = new JSONObject();
+ msg.put(PRIMARY_KEY, true);
return new Message(msg);
}
+
public static boolean isBatchFinishMessage(IMessage message) {
- return message.getMessageBody().getBooleanValue(PRIMARY_KEY);
+ try {
+ return message.getMessageBody().getBooleanValue(PRIMARY_KEY);
+ } catch (Throwable t) {
+ return false;
+ }
}
- public IMessage getMsg(){
+ public IMessage getMsg() {
return message;
}
}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java
index ced1653..3b667c0 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java
@@ -22,7 +22,7 @@ import org.apache.rocketmq.streams.client.source.DataStreamSource;
public class FileSourceExample {
public static void main(String[] args) {
DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
- source.fromFile("/Users/test.sql", false)
+ source.fromFile("data.txt", false)
.map(message -> message)
.toPrint(1)
.start();