You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/10/15 11:19:21 UTC

[rocketmq-streams] branch main updated: [ISSUE #85] Recorrect example. (#84)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new 715fdb5  [ISSUE #85] Recorrect example. (#84)
715fdb5 is described below

commit 715fdb545a67f47e42ae13626cffbbdd26027748
Author: Ni Ze <31...@users.noreply.github.com>
AuthorDate: Fri Oct 15 19:19:16 2021 +0800

    [ISSUE #85] Recorrect example. (#84)
    
    * recorrect example.
    
    * add apache license in new file.
    
    * modify illustrate.
    
    Co-authored-by: nize <un...@gmail.com>
---
 .../streams/common/datatype/HllDataType.java       | 17 +++++++++
 .../common/topology/stages/udf/UDFChainStage.java  | 44 +++++++++++-----------
 .../rocketmqsource/RocketMQSourceExample2.java     |  4 +-
 .../rocketmqsource/RocketMQSourceExample3.java     |  4 +-
 .../rocketmqsource/RocketmqWindowTest.java         |  6 +--
 .../rocketmq/streams/state/kv/TestLruState.java    | 24 ++++++++++--
 .../streams/state/kv/TestRocksdbState.java         | 26 +++++++++++--
 .../streams/window/fire/EventTimeManager.java      | 12 ++++--
 .../streams/window/shuffle/ShuffleChannel.java     |  7 +++-
 .../rocketmq/streams/storage/RocksdbTest.java      | 30 ++++++++++++---
 10 files changed, 127 insertions(+), 47 deletions(-)

diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/HllDataType.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/HllDataType.java
index df5055f..419c46f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/HllDataType.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/HllDataType.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.rocketmq.streams.common.datatype;
 
 import com.alibaba.fastjson.JSONObject;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java
index 6c12753..56aa804 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java
@@ -73,27 +73,27 @@ public class UDFChainStage extends AbstractStatelessChainStage implements IAfter
         loadLogFinger();
     }
 
-    @Override public IMessage doMessage(IMessage t, AbstractContext context) {
-        if (filterByLogFingerprint(t)) {
-            context.breakExecute();
-            return null;
-        }
-        IStageHandle handle = selectHandle(t, context);
-        if (handle == null) {
-            return t;
-        }
-        IMessage result = handle.doMessage(t, context);
-        if (!context.isContinue() || result == null) {
-            if (context.get("NEED_USE_FINGER_PRINT") != null && Boolean.parseBoolean(context.get("NEED_USE_FINGER_PRINT").toString())) {
-                sourceStage.addLogFingerprint(t);
-                context.remove("NEED_USE_FINGER_PRINT");
-            }
-            return context.breakExecute();
-        }
-        if (context.get("NEED_USE_FINGER_PRINT") != null) {
-            context.remove("NEED_USE_FINGER_PRINT");
-        }
-        return result;
-    }
+//    @Override public IMessage doMessage(IMessage t, AbstractContext context) {
+//        if (filterByLogFingerprint(t)) {
+//            context.breakExecute();
+//            return null;
+//        }
+//        IStageHandle handle = selectHandle(t, context);
+//        if (handle == null) {
+//            return t;
+//        }
+//        IMessage result = handle.doMessage(t, context);
+//        if (!context.isContinue() || result == null) {
+//            if (context.get("NEED_USE_FINGER_PRINT") != null && Boolean.parseBoolean(context.get("NEED_USE_FINGER_PRINT").toString())) {
+//                sourceStage.addLogFingerprint(t);
+//                context.remove("NEED_USE_FINGER_PRINT");
+//            }
+//            return context.breakExecute();
+//        }
+//        if (context.get("NEED_USE_FINGER_PRINT") != null) {
+//            context.remove("NEED_USE_FINGER_PRINT");
+//        }
+//        return result;
+//    }
 
 }
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
index 69c217b..5610b1f 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
@@ -27,7 +27,7 @@ import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_T
 
 public class RocketMQSourceExample2 {
     /**
-     * 1、before run this case, make sure some data has already been rocketmq.
+     * 1、make sure your rocketmq server has been started.
      */
     public static void main(String[] args) {
         ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC);
@@ -54,7 +54,7 @@ public class RocketMQSourceExample2 {
                 .filter((value) -> {
                     System.out.println("filter: ===========");
                     String messageValue = (String)value;
-                    return !messageValue.contains("InFlow");
+                    return messageValue.contains("InFlow");
                 })
                 .flatMap((message)->{
                     String value = (String) message;
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
index 8411745..45a9216 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
@@ -32,7 +32,7 @@ import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_T
 
 public class RocketMQSourceExample3 {
     /**
-     * 1、before run this case, make sure some data has already been rocketmq.
+     * 1、make sure your rocketmq server has been started.
      */
     public static void main(String[] args) {
         ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC);
@@ -56,7 +56,7 @@ public class RocketMQSourceExample3 {
                 .map(message -> message)
                 .filter((value) -> {
                     String messageValue = (String) value;
-                    return !messageValue.contains("InFlow");
+                    return messageValue.contains("InFlow");
                 })
                 .flatMap((message) -> {
                     JSONObject jsonObject = JSONObject.parseObject((String) message);
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
index 6dfdfb5..48e98c9 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
@@ -31,7 +31,7 @@ import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_T
 public class RocketmqWindowTest {
 
     /**
-     * 1、before run this case, make sure some data has already been rocketmq.
+     * 1、make sure your rocketmq server has been started.
      * 2、rocketmq allow create topic automatically.
      */
     public static void main(String[] args) {
@@ -54,9 +54,9 @@ public class RocketmqWindowTest {
                         JSONObject.parseObject((String) message);
                     } catch (Throwable t) {
                         // if can not convert to json, discard it.because all operator are base on json.
-                        return true;
+                        return false;
                     }
-                    return false;
+                    return true;
                 })
                 //must convert message to json.
                 .map(message -> JSONObject.parseObject((String) message))
diff --git a/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestLruState.java b/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestLruState.java
index b7e43ae..e9470de 100644
--- a/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestLruState.java
+++ b/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestLruState.java
@@ -1,12 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.rocketmq.streams.state.kv;
 
+import org.apache.rocketmq.streams.state.LruState;
+import org.junit.Assert;
+import org.junit.Test;
+
 import java.util.Iterator;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import org.apache.rocketmq.streams.state.LruState;
-import org.junit.Assert;
-import org.junit.Test;
 
 public class TestLruState {
 
diff --git a/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestRocksdbState.java b/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestRocksdbState.java
index cf1a8b7..fb47fba 100644
--- a/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestRocksdbState.java
+++ b/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestRocksdbState.java
@@ -1,14 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.rocketmq.streams.state.kv;
 
+import org.apache.rocketmq.streams.state.kv.rocksdb.RocksdbState;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.rocketmq.streams.state.kv.rocksdb.RocksdbState;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
 
 /**
  * @author arthur.liang
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
index 990b690..90adc50 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
@@ -16,16 +16,16 @@
  */
 package org.apache.rocketmq.streams.window.fire;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.topology.model.IWindow;
 import org.apache.rocketmq.streams.window.operator.AbstractWindow;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 public class EventTimeManager {
     private Map<String,SplitEventTimeManager> eventTimeManagerMap=new HashMap<>();
     protected ISource source;
@@ -52,6 +52,10 @@ public class EventTimeManager {
         SplitEventTimeManager splitEventTimeManager = eventTimeManagerMap.get(queueId);
         if (splitEventTimeManager != null) {
             Long currentMaxEventTime = splitEventTimeManager.getMaxEventTime();
+            if (currentMaxEventTime == null) {
+                return null;
+            }
+
             if (eventTimeIncreasementMap.containsKey(queueId)) {
                 Long lastMaxEventTime = eventTimeIncreasementMap.get(queueId).getKey();
                 if (lastMaxEventTime.equals(currentMaxEventTime)) {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index 75d7bb1..6ff65c4 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.context.Message;
+import org.apache.rocketmq.streams.common.context.MessageHeader;
 import org.apache.rocketmq.streams.common.context.MessageOffset;
 import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
 import org.apache.rocketmq.streams.common.topology.ChainPipeline;
@@ -398,7 +399,11 @@ public class ShuffleChannel extends AbstractSystemChannel {
             for (int i = 0; i < messages.size(); i++) {
                 JSONObject object = messages.getJSONObject(i);
                 groupByList.add(object.getString("SHUFFLE_KEY"));
-                traceList.add(object.getJSONObject("MessageHeader").getString("traceId"));
+
+                MessageHeader messageHeader = object.getObject("MessageHeader", MessageHeader.class);
+                if (messageHeader != null) {
+                    traceList.add(messageHeader.getTraceId());
+                }
             }
             String traceInfo = StringUtils.join(traceList);
             String groupInfo = StringUtils.join(groupByList);
diff --git a/rocketmq-streams-window/src/test/java/org/apache/rocketmq/streams/storage/RocksdbTest.java b/rocketmq-streams-window/src/test/java/org/apache/rocketmq/streams/storage/RocksdbTest.java
index c708fdd..d639245 100644
--- a/rocketmq-streams-window/src/test/java/org/apache/rocketmq/streams/storage/RocksdbTest.java
+++ b/rocketmq-streams-window/src/test/java/org/apache/rocketmq/streams/storage/RocksdbTest.java
@@ -1,11 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.rocketmq.streams.storage;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.window.model.WindowInstance;
 import org.apache.rocketmq.streams.window.operator.impl.SessionOperator;
@@ -16,6 +27,13 @@ import org.apache.rocketmq.streams.window.storage.rocksdb.RocksdbStorage;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class RocksdbTest {
 
     private static RocksdbStorage storage = new RocksdbStorage<>();