You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/05/15 00:48:14 UTC

[rocketmq-streams] branch develop updated: Support flatmap function in GroupedStream

This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/develop by this push:
     new b9e790cf Support flatmap function in GroupedStream
     new a69d3411 Merge pull request #292 from Shuozeli/develop
b9e790cf is described below

commit b9e790cf93f6a78b5666a9a063e28d8ef5f9b49f
Author: Shuoze Li <sz...@gmail.com>
AuthorDate: Sun May 14 08:59:42 2023 -0700

    Support flatmap function in GroupedStream
---
 .../apache/rocketmq/streams/core/rstream/GroupedStream.java  |  2 ++
 .../rocketmq/streams/core/rstream/GroupedStreamImpl.java     | 12 ++++++++++++
 2 files changed, 14 insertions(+)

diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStream.java b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStream.java
index dee6c8e5..e7526817 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStream.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStream.java
@@ -46,6 +46,8 @@ public interface GroupedStream<K, V> {
 
     <OUT> GroupedStream<K, OUT> map(ValueMapperAction<V, OUT> valueMapperAction);
 
+    <VR> RStream<VR> flatMap(ValueMapperAction<V,? extends Iterable<? extends VR>> valueMapperAction);
+
 
     <OUT> GroupedStream<K, OUT> aggregate(Accumulator<V, OUT> accumulator);
 
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java
index b813aea6..71dd73da 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java
@@ -25,6 +25,7 @@ import org.apache.rocketmq.streams.core.function.supplier.AccumulatorSupplier;
 import org.apache.rocketmq.streams.core.function.supplier.AddTagSupplier;
 import org.apache.rocketmq.streams.core.function.supplier.AggregateSupplier;
 import org.apache.rocketmq.streams.core.function.supplier.FilterSupplier;
+import org.apache.rocketmq.streams.core.function.supplier.MultiValueChangeSupplier;
 import org.apache.rocketmq.streams.core.function.supplier.SinkSupplier;
 import org.apache.rocketmq.streams.core.function.supplier.SumAggregate;
 import org.apache.rocketmq.streams.core.function.supplier.ValueChangeSupplier;
@@ -42,6 +43,7 @@ import java.util.function.Supplier;
 import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.FILTER_PREFIX;
 import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.COUNT_PREFIX;
 import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.ACCUMULATE_PREFIX;
+import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.FLAT_MAP_PREFIX;
 import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.MAP_PREFIX;
 import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.MAX_PREFIX;
 import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.MIN_PREFIX;
@@ -186,6 +188,16 @@ public class GroupedStreamImpl<K, V> implements GroupedStream<K, V> {
         return this.pipeline.addGroupedStreamVirtualNode(graphNode, parent);
     }
 
+    @Override
+    public <VR> RStream<VR> flatMap(ValueMapperAction<V, ? extends Iterable<? extends VR>> valueMapperAction) {
+        String name = OperatorNameMaker.makeName(FLAT_MAP_PREFIX, pipeline.getJobId());
+
+        MultiValueChangeSupplier<V, VR> changeSupplier = new MultiValueChangeSupplier<>(valueMapperAction);
+        GraphNode graphNode = new ProcessorNode<>(name, parent.getName(), changeSupplier);
+
+        return this.pipeline.addRStreamVirtualNode(graphNode, parent);
+    }
+
     @Override
     public <OUT> GroupedStream<K, OUT> aggregate(Accumulator<V, OUT> accumulator) {
         String name = OperatorNameMaker.makeName(ACCUMULATE_PREFIX, pipeline.getJobId());