You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by se...@apache.org on 2021/12/08 06:25:51 UTC

[rocketmq-streams] branch main updated: fix(example) fix file source example

This is an automated email from the ASF dual-hosted git repository.

seraph pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f84c2d  fix(example) fix file source example
     new 2b9d15b  Merge pull request #96 from ni-ze/main
0f84c2d is described below

commit 0f84c2d035bcdf9763d9eff78a535b6ea3027e71
Author: 维章 <un...@gmail.com>
AuthorDate: Wed Dec 8 11:41:02 2021 +0800

    fix(example) fix file source example
---
 .../common/batchsystem/BatchFinishMessage.java     | 23 ++++++++++++++--------
 .../examples/filesource/FileSourceExample.java     |  2 +-
 2 files changed, 16 insertions(+), 9 deletions(-)

diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/batchsystem/BatchFinishMessage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/batchsystem/BatchFinishMessage.java
index aa8d0a1..a7a9f84 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/batchsystem/BatchFinishMessage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/batchsystem/BatchFinishMessage.java
@@ -22,21 +22,28 @@ import org.apache.rocketmq.streams.common.context.Message;
 import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
 
 public class BatchFinishMessage implements ISystemMessage {
-    public static String PRIMARY_KEY="__Primary_KEY";
+    public static String PRIMARY_KEY = "__Primary_KEY";
     protected IMessage message;
-    public BatchFinishMessage(IMessage message){
-        this.message=message;
+
+    public BatchFinishMessage(IMessage message) {
+        this.message = message;
     }
-    public static IMessage create(){
-        JSONObject msg=new JSONObject();
-        msg.put(PRIMARY_KEY,true);
+
+    public static IMessage create() {
+        JSONObject msg = new JSONObject();
+        msg.put(PRIMARY_KEY, true);
         return new Message(msg);
     }
+
     public static boolean isBatchFinishMessage(IMessage message) {
-        return message.getMessageBody().getBooleanValue(PRIMARY_KEY);
+        try {
+            return message.getMessageBody().getBooleanValue(PRIMARY_KEY);
+        } catch (Throwable t) {
+            return false;
+        }
     }
 
-    public IMessage getMsg(){
+    public IMessage getMsg() {
         return message;
     }
 }
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java
index ced1653..3b667c0 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java
@@ -22,7 +22,7 @@ import org.apache.rocketmq.streams.client.source.DataStreamSource;
 public class FileSourceExample {
     public static void main(String[] args) {
         DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
-        source.fromFile("/Users/test.sql", false)
+        source.fromFile("data.txt", false)
             .map(message -> message)
             .toPrint(1)
             .start();