You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/05/15 00:48:14 UTC
[rocketmq-streams] branch develop updated: Support flatmap function in GroupedStream
This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/develop by this push:
new b9e790cf Support flatmap function in GroupedStream
new a69d3411 Merge pull request #292 from Shuozeli/develop
b9e790cf is described below
commit b9e790cf93f6a78b5666a9a063e28d8ef5f9b49f
Author: Shuoze Li <sz...@gmail.com>
AuthorDate: Sun May 14 08:59:42 2023 -0700
Support flatmap function in GroupedStream
---
.../apache/rocketmq/streams/core/rstream/GroupedStream.java | 2 ++
.../rocketmq/streams/core/rstream/GroupedStreamImpl.java | 12 ++++++++++++
2 files changed, 14 insertions(+)
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStream.java b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStream.java
index dee6c8e5..e7526817 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStream.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStream.java
@@ -46,6 +46,8 @@ public interface GroupedStream<K, V> {
<OUT> GroupedStream<K, OUT> map(ValueMapperAction<V, OUT> valueMapperAction);
+ <VR> RStream<VR> flatMap(ValueMapperAction<V,? extends Iterable<? extends VR>> valueMapperAction);
+
<OUT> GroupedStream<K, OUT> aggregate(Accumulator<V, OUT> accumulator);
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java
index b813aea6..71dd73da 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java
@@ -25,6 +25,7 @@ import org.apache.rocketmq.streams.core.function.supplier.AccumulatorSupplier;
import org.apache.rocketmq.streams.core.function.supplier.AddTagSupplier;
import org.apache.rocketmq.streams.core.function.supplier.AggregateSupplier;
import org.apache.rocketmq.streams.core.function.supplier.FilterSupplier;
+import org.apache.rocketmq.streams.core.function.supplier.MultiValueChangeSupplier;
import org.apache.rocketmq.streams.core.function.supplier.SinkSupplier;
import org.apache.rocketmq.streams.core.function.supplier.SumAggregate;
import org.apache.rocketmq.streams.core.function.supplier.ValueChangeSupplier;
@@ -42,6 +43,7 @@ import java.util.function.Supplier;
import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.FILTER_PREFIX;
import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.COUNT_PREFIX;
import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.ACCUMULATE_PREFIX;
+import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.FLAT_MAP_PREFIX;
import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.MAP_PREFIX;
import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.MAX_PREFIX;
import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.MIN_PREFIX;
@@ -186,6 +188,16 @@ public class GroupedStreamImpl<K, V> implements GroupedStream<K, V> {
return this.pipeline.addGroupedStreamVirtualNode(graphNode, parent);
}
+ @Override
+ public <VR> RStream<VR> flatMap(ValueMapperAction<V, ? extends Iterable<? extends VR>> valueMapperAction) {
+ String name = OperatorNameMaker.makeName(FLAT_MAP_PREFIX, pipeline.getJobId());
+
+ MultiValueChangeSupplier<V, VR> changeSupplier = new MultiValueChangeSupplier<>(valueMapperAction);
+ GraphNode graphNode = new ProcessorNode<>(name, parent.getName(), changeSupplier);
+
+ return this.pipeline.addRStreamVirtualNode(graphNode, parent);
+ }
+
@Override
public <OUT> GroupedStream<K, OUT> aggregate(Accumulator<V, OUT> accumulator) {
String name = OperatorNameMaker.makeName(ACCUMULATE_PREFIX, pipeline.getJobId());