You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/10/15 11:19:21 UTC
[rocketmq-streams] branch main updated: [ISSUE #85] Recorrect
example. (#84)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 715fdb5 [ISSUE #85] Recorrect example. (#84)
715fdb5 is described below
commit 715fdb545a67f47e42ae13626cffbbdd26027748
Author: Ni Ze <31...@users.noreply.github.com>
AuthorDate: Fri Oct 15 19:19:16 2021 +0800
[ISSUE #85] Recorrect example. (#84)
* recorrect example.
* add apache license in new file.
* modify illustrate.
Co-authored-by: nize <un...@gmail.com>
---
.../streams/common/datatype/HllDataType.java | 17 +++++++++
.../common/topology/stages/udf/UDFChainStage.java | 44 +++++++++++-----------
.../rocketmqsource/RocketMQSourceExample2.java | 4 +-
.../rocketmqsource/RocketMQSourceExample3.java | 4 +-
.../rocketmqsource/RocketmqWindowTest.java | 6 +--
.../rocketmq/streams/state/kv/TestLruState.java | 24 ++++++++++--
.../streams/state/kv/TestRocksdbState.java | 26 +++++++++++--
.../streams/window/fire/EventTimeManager.java | 12 ++++--
.../streams/window/shuffle/ShuffleChannel.java | 7 +++-
.../rocketmq/streams/storage/RocksdbTest.java | 30 ++++++++++++---
10 files changed, 127 insertions(+), 47 deletions(-)
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/HllDataType.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/HllDataType.java
index df5055f..419c46f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/HllDataType.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/HllDataType.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.streams.common.datatype;
import com.alibaba.fastjson.JSONObject;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java
index 6c12753..56aa804 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java
@@ -73,27 +73,27 @@ public class UDFChainStage extends AbstractStatelessChainStage implements IAfter
loadLogFinger();
}
- @Override public IMessage doMessage(IMessage t, AbstractContext context) {
- if (filterByLogFingerprint(t)) {
- context.breakExecute();
- return null;
- }
- IStageHandle handle = selectHandle(t, context);
- if (handle == null) {
- return t;
- }
- IMessage result = handle.doMessage(t, context);
- if (!context.isContinue() || result == null) {
- if (context.get("NEED_USE_FINGER_PRINT") != null && Boolean.parseBoolean(context.get("NEED_USE_FINGER_PRINT").toString())) {
- sourceStage.addLogFingerprint(t);
- context.remove("NEED_USE_FINGER_PRINT");
- }
- return context.breakExecute();
- }
- if (context.get("NEED_USE_FINGER_PRINT") != null) {
- context.remove("NEED_USE_FINGER_PRINT");
- }
- return result;
- }
+// @Override public IMessage doMessage(IMessage t, AbstractContext context) {
+// if (filterByLogFingerprint(t)) {
+// context.breakExecute();
+// return null;
+// }
+// IStageHandle handle = selectHandle(t, context);
+// if (handle == null) {
+// return t;
+// }
+// IMessage result = handle.doMessage(t, context);
+// if (!context.isContinue() || result == null) {
+// if (context.get("NEED_USE_FINGER_PRINT") != null && Boolean.parseBoolean(context.get("NEED_USE_FINGER_PRINT").toString())) {
+// sourceStage.addLogFingerprint(t);
+// context.remove("NEED_USE_FINGER_PRINT");
+// }
+// return context.breakExecute();
+// }
+// if (context.get("NEED_USE_FINGER_PRINT") != null) {
+// context.remove("NEED_USE_FINGER_PRINT");
+// }
+// return result;
+// }
}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
index 69c217b..5610b1f 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
@@ -27,7 +27,7 @@ import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_T
public class RocketMQSourceExample2 {
/**
- * 1、before run this case, make sure some data has already been rocketmq.
+ * 1、make sure your rocketmq server has been started.
*/
public static void main(String[] args) {
ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC);
@@ -54,7 +54,7 @@ public class RocketMQSourceExample2 {
.filter((value) -> {
System.out.println("filter: ===========");
String messageValue = (String)value;
- return !messageValue.contains("InFlow");
+ return messageValue.contains("InFlow");
})
.flatMap((message)->{
String value = (String) message;
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
index 8411745..45a9216 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
@@ -32,7 +32,7 @@ import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_T
public class RocketMQSourceExample3 {
/**
- * 1、before run this case, make sure some data has already been rocketmq.
+ * 1、make sure your rocketmq server has been started.
*/
public static void main(String[] args) {
ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC);
@@ -56,7 +56,7 @@ public class RocketMQSourceExample3 {
.map(message -> message)
.filter((value) -> {
String messageValue = (String) value;
- return !messageValue.contains("InFlow");
+ return messageValue.contains("InFlow");
})
.flatMap((message) -> {
JSONObject jsonObject = JSONObject.parseObject((String) message);
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
index 6dfdfb5..48e98c9 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
@@ -31,7 +31,7 @@ import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_T
public class RocketmqWindowTest {
/**
- * 1、before run this case, make sure some data has already been rocketmq.
+ * 1、make sure your rocketmq server has been started.
* 2、rocketmq allow create topic automatically.
*/
public static void main(String[] args) {
@@ -54,9 +54,9 @@ public class RocketmqWindowTest {
JSONObject.parseObject((String) message);
} catch (Throwable t) {
// if can not convert to json, discard it.because all operator are base on json.
- return true;
+ return false;
}
- return false;
+ return true;
})
//must convert message to json.
.map(message -> JSONObject.parseObject((String) message))
diff --git a/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestLruState.java b/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestLruState.java
index b7e43ae..e9470de 100644
--- a/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestLruState.java
+++ b/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestLruState.java
@@ -1,12 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.streams.state.kv;
+import org.apache.rocketmq.streams.state.LruState;
+import org.junit.Assert;
+import org.junit.Test;
+
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import org.apache.rocketmq.streams.state.LruState;
-import org.junit.Assert;
-import org.junit.Test;
public class TestLruState {
diff --git a/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestRocksdbState.java b/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestRocksdbState.java
index cf1a8b7..fb47fba 100644
--- a/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestRocksdbState.java
+++ b/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestRocksdbState.java
@@ -1,14 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.streams.state.kv;
+import org.apache.rocketmq.streams.state.kv.rocksdb.RocksdbState;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.rocketmq.streams.state.kv.rocksdb.RocksdbState;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
/**
* @author arthur.liang
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
index 990b690..90adc50 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
@@ -16,16 +16,16 @@
*/
package org.apache.rocketmq.streams.window.fire;
-import java.util.HashMap;
-import java.util.Map;
-
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.topology.model.IWindow;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
public class EventTimeManager {
private Map<String,SplitEventTimeManager> eventTimeManagerMap=new HashMap<>();
protected ISource source;
@@ -52,6 +52,10 @@ public class EventTimeManager {
SplitEventTimeManager splitEventTimeManager = eventTimeManagerMap.get(queueId);
if (splitEventTimeManager != null) {
Long currentMaxEventTime = splitEventTimeManager.getMaxEventTime();
+ if (currentMaxEventTime == null) {
+ return null;
+ }
+
if (eventTimeIncreasementMap.containsKey(queueId)) {
Long lastMaxEventTime = eventTimeIncreasementMap.get(queueId).getKey();
if (lastMaxEventTime.equals(currentMaxEventTime)) {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index 75d7bb1..6ff65c4 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
+import org.apache.rocketmq.streams.common.context.MessageHeader;
import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
@@ -398,7 +399,11 @@ public class ShuffleChannel extends AbstractSystemChannel {
for (int i = 0; i < messages.size(); i++) {
JSONObject object = messages.getJSONObject(i);
groupByList.add(object.getString("SHUFFLE_KEY"));
- traceList.add(object.getJSONObject("MessageHeader").getString("traceId"));
+
+ MessageHeader messageHeader = object.getObject("MessageHeader", MessageHeader.class);
+ if (messageHeader != null) {
+ traceList.add(messageHeader.getTraceId());
+ }
}
String traceInfo = StringUtils.join(traceList);
String groupInfo = StringUtils.join(groupByList);
diff --git a/rocketmq-streams-window/src/test/java/org/apache/rocketmq/streams/storage/RocksdbTest.java b/rocketmq-streams-window/src/test/java/org/apache/rocketmq/streams/storage/RocksdbTest.java
index c708fdd..d639245 100644
--- a/rocketmq-streams-window/src/test/java/org/apache/rocketmq/streams/storage/RocksdbTest.java
+++ b/rocketmq-streams-window/src/test/java/org/apache/rocketmq/streams/storage/RocksdbTest.java
@@ -1,11 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.streams.storage;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.impl.SessionOperator;
@@ -16,6 +27,13 @@ import org.apache.rocketmq.streams.window.storage.rocksdb.RocksdbStorage;
import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
public class RocksdbTest {
private static RocksdbStorage storage = new RocksdbStorage<>();