You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/10/12 08:51:46 UTC

[ignite-3] branch main updated: IGNITE-17553: Add flatMap function to Flow framework (#1180)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 719904fe7a IGNITE-17553: Add flatMap function to Flow framework (#1180)
719904fe7a is described below

commit 719904fe7afd0f0d575e4f4b1ba56364a47f2b3f
Author: Ivan Gagarkin <ga...@gmail.com>
AuthorDate: Wed Oct 12 11:51:41 2022 +0300

    IGNITE-17553: Add flatMap function to Flow framework (#1180)
---
 .../internal/cli/call/connect/ConnectCall.java     |  12 +-
 .../questions/ConnectToClusterQuestion.java        |  23 ++--
 .../apache/ignite/internal/cli/core/flow/Flow.java |   2 +-
 .../ignite/internal/cli/core/flow/Flowable.java    |   2 +
 .../cli/core/flow/builder/FlowBuilder.java         |  10 ++
 .../cli/core/flow/builder/FlowBuilderImpl.java     |  17 ++-
 .../internal/cli/core/flow/builder/Flows.java      |   7 ++
 .../internal/cli/commands/flow/FlowTest.java       | 130 +++++++++++----------
 8 files changed, 115 insertions(+), 88 deletions(-)

diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/ConnectCall.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/ConnectCall.java
index cd65c5e27d..ceffeec1e8 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/ConnectCall.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/ConnectCall.java
@@ -55,10 +55,10 @@ public class ConnectCall implements Call<ConnectCallInput, String> {
     public CallOutput<String> execute(ConnectCallInput input) {
         try {
             String nodeUrl = input.getNodeUrl();
+            String configuration = fetchNodeConfiguration(nodeUrl);
+            session.setNodeName(fetchNodeName(nodeUrl));
             session.setNodeUrl(nodeUrl);
             stateConfigProvider.get().setProperty(ConfigConstants.LAST_CONNECTED_URL, nodeUrl);
-            session.setNodeName(fetchNodeName(input));
-            String configuration = fetchNodeConfiguration(input);
             session.setJdbcUrl(constructJdbcUrl(configuration, nodeUrl));
             session.setConnectedToNode(true);
 
@@ -70,12 +70,12 @@ public class ConnectCall implements Call<ConnectCallInput, String> {
         }
     }
 
-    private String fetchNodeName(ConnectCallInput input) throws ApiException {
-        return new NodeManagementApi(Configuration.getDefaultApiClient().setBasePath(input.getNodeUrl())).nodeState().getName();
+    private String fetchNodeName(String nodeUrl) throws ApiException {
+        return new NodeManagementApi(Configuration.getDefaultApiClient().setBasePath(nodeUrl)).nodeState().getName();
     }
 
-    private String fetchNodeConfiguration(ConnectCallInput input) throws ApiException {
-        return new NodeConfigurationApi(Configuration.getDefaultApiClient().setBasePath(input.getNodeUrl())).getNodeConfiguration();
+    private String fetchNodeConfiguration(String nodeUrl) throws ApiException {
+        return new NodeConfigurationApi(Configuration.getDefaultApiClient().setBasePath(nodeUrl)).getNodeConfiguration();
     }
 
     private String constructJdbcUrl(String configuration, String nodeUrl) {
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/questions/ConnectToClusterQuestion.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/questions/ConnectToClusterQuestion.java
index d8bb358c8d..7e0b8f1194 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/questions/ConnectToClusterQuestion.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/questions/ConnectToClusterQuestion.java
@@ -25,7 +25,6 @@ import org.apache.ignite.internal.cli.call.connect.ConnectCallInput;
 import org.apache.ignite.internal.cli.config.ConfigConstants;
 import org.apache.ignite.internal.cli.config.ConfigManagerProvider;
 import org.apache.ignite.internal.cli.config.StateConfigProvider;
-import org.apache.ignite.internal.cli.core.flow.Flowable;
 import org.apache.ignite.internal.cli.core.flow.builder.FlowBuilder;
 import org.apache.ignite.internal.cli.core.flow.builder.Flows;
 import org.apache.ignite.internal.cli.core.repl.Session;
@@ -66,16 +65,16 @@ public class ConnectToClusterQuestion {
         );
 
         return Flows.from(clusterUrlOrSessionNode(clusterUrl))
-                .ifThen(Objects::isNull, Flows.<String, ConnectCallInput>acceptQuestion(questionUiComponent,
-                                () -> new ConnectCallInput(defaultUrl))
-                        .then(Flows.fromCall(connectCall))
-                        .print()
-                        .build())
-                .then(prevUrl -> {
-                    // If inner flow from ifThen is interrupted we should interrupt outer flow as well.
-                    // TODO https://issues.apache.org/jira/browse/IGNITE-17553
-                    String url = clusterUrlOrSessionNode(clusterUrl);
-                    return url != null ? Flowable.success(url) : Flowable.interrupt();
+                .flatMap(v -> {
+                    if (Objects.isNull(v)) {
+                        return Flows.<String, ConnectCallInput>acceptQuestion(questionUiComponent,
+                                        () -> new ConnectCallInput(defaultUrl))
+                                .then(Flows.fromCall(connectCall))
+                                .print()
+                                .map(ignored -> clusterUrlOrSessionNode(clusterUrl));
+                    } else {
+                        return Flows.identity();
+                    }
                 });
     }
 
@@ -110,7 +109,7 @@ public class ConnectToClusterQuestion {
                 .print()
                 .ifThen(s -> !Objects.equals(clusterUrl, defaultUrl) && session.isConnectedToNode(),
                         defaultUrlQuestion(clusterUrl).print().build())
-                .build().start(Flowable.empty());
+                .start();
     }
 
     private FlowBuilder<String, String> defaultUrlQuestion(String lastConnectedUrl) {
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flow.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flow.java
index 90a94b55a7..586b1c2b81 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flow.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flow.java
@@ -36,7 +36,7 @@ public interface Flow<I, O> {
     /**
      * Flow composition method.
      *
-     * @param next flow which will be executed after currecnt instance with result of it.
+     * @param next flow which will be executed after current instance with result of it.
      * @param <OT> new output type.
      * @return output flowable of {@param next}
      */
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flowable.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flowable.java
index 5aa59a4872..cd364f4b71 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flowable.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flowable.java
@@ -81,6 +81,8 @@ public interface Flowable<T> {
     static <T> Flowable<T> process(Supplier<T> supplier) {
         try {
             return success(supplier.get());
+        } catch (FlowInterruptException e) {
+            throw e;
         } catch (Exception e) {
             return failure(e);
         }
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java
index a1e3b41b9b..f55c9d7099 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java
@@ -54,6 +54,16 @@ public interface FlowBuilder<I, O>  {
         return then(Flows.mono(mapper));
     }
 
+    /**
+     * Returns a {@link FlowBuilder} consisting of the results of replacing each element
+     * of this flow with the contents of a mapped flow produced
+     * by applying the provided mapping function to each element.
+     *
+     * @param mapper function to apply to each element which produces a flow of new values
+     * @return the new {@link FlowBuilder}
+     */
+    <OT> FlowBuilder<I, OT> flatMap(Function<O, FlowBuilder<O, OT>> mapper);
+
     /**
      * Appends the flow to this builder if the result of the current flow matches the predicate.
      *
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java
index 1d5e60ef86..ab66313ef3 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java
@@ -69,6 +69,11 @@ public class FlowBuilderImpl<I, O> implements FlowBuilder<I, O> {
         return new FlowBuilderImpl<>(this.flow.composite(flow), exceptionHandlers, decoratorRegistry);
     }
 
+    @Override
+    public <OT> FlowBuilder<I, OT> flatMap(Function<O, FlowBuilder<O, OT>> mapper) {
+        return then(it -> mapper.apply(it.value()).build().start(it));
+    }
+
     @Override
     public <OT> FlowBuilder<I, O> ifThen(Predicate<O> tester, Flow<O, OT> flow) {
         return then(input -> {
@@ -114,7 +119,11 @@ public class FlowBuilderImpl<I, O> implements FlowBuilder<I, O> {
 
     @Override
     public void start() {
-        run(Flowable.empty());
+        try {
+            run(Flowable.empty());
+        } catch (FlowInterruptException ignored) {
+            // FlowInterruptException is an internal exception and shouldn't be exposed to users
+        }
     }
 
     /**
@@ -124,11 +133,7 @@ public class FlowBuilderImpl<I, O> implements FlowBuilder<I, O> {
      * @return output flowable
      */
     private Flowable<O> run(Flowable<I> input) {
-        try {
-            return flow.start(input);
-        } catch (FlowInterruptException e) {
-            return Flowable.empty();
-        }
+        return flow.start(input);
     }
 
     /**
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/Flows.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/Flows.java
index 0a1abfdbb6..2f98bfe9c2 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/Flows.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/Flows.java
@@ -153,4 +153,11 @@ public final class Flows {
     public static <I, O> FlowBuilder<I, O> acceptQuestion(QuestionUiComponent question, Supplier<O> onAccept) {
         return acceptQuestion(question.render(), onAccept);
     }
+
+    /**
+     * Returns a {@link FlowBuilder} that always returns its input argument.
+     */
+    public static <I> FlowBuilder<I, I> identity() {
+        return new FlowBuilderImpl<>(input -> Flowable.success(input.value()));
+    }
 }
diff --git a/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java b/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java
index ec554ad770..2267d6ac20 100644
--- a/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java
+++ b/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java
@@ -53,6 +53,24 @@ class FlowTest {
     private StringWriter out;
     private StringWriter errOut;
 
+    private static Flow<Object, Integer> createFlow() {
+        return askQuestion()
+                .question(s -> "Here is your number " + s + ":, would you like to multiply it by 2?",
+                        List.of(new QuestionAnswer<>("yes"::equals, (a, i) -> Integer.parseInt(i) * 2),
+                                new QuestionAnswer<>("no"::equals, (a, i) -> Integer.parseInt(i))))
+                .ifThen(num -> num == 1, Flows.fromCall(new IntCall(), IntCallInput::new))
+                .ifThen(num -> num > 1, Flows.fromCall(new StrCall(), integer -> new StrCallInput(String.valueOf(integer))))
+                .build();
+    }
+
+    private static FlowBuilder<Object, String> askQuestion() {
+        return Flows.question("Do you like this?",
+                        List.of(new QuestionAnswer<>("yes"::equals, (a, i) -> 1),
+                                new QuestionAnswer<>("no"::equals, (a, i) -> 2))
+                )
+                .map(String::valueOf);
+    }
+
     @BeforeEach
     public void setup() throws IOException {
         input = Files.createTempFile("input", "");
@@ -109,18 +127,12 @@ class FlowTest {
         // Given
         bindAnswers("no"); // we don't care about answer in this test
 
-        // When build flow
-        Flow<Object, String> flow = askQuestion()
+        // When build flow and start
+        askQuestion()
                 .exceptionHandler(new TestExceptionHandler())
                 .then(Flows.fromCall(new ThrowingStrCall(), StrCallInput::new))
                 .print()
-                .build();
-
-        // Then the output is empty
-        assertThat(errOut.toString(), emptyString());
-
-        // When start flow
-        flow.start(Flowable.empty());
+                .start();
 
         // Then output equals to the message from the exception because we use TestExceptionHandler
         assertThat(errOut.toString(), equalTo("Ooops!" + System.lineSeparator()));
@@ -132,18 +144,12 @@ class FlowTest {
         // Given
         bindAnswers("no"); // we don't care about answer in this test
 
-        // When build flow
-        Flow<Object, String> flow = askQuestion()
+        // When build flow and start
+        askQuestion()
                 .print()
                 .then(Flows.fromCall(new ThrowingStrCall(), StrCallInput::new))
                 .exceptionHandler(new TestExceptionHandler())
-                .build();
-
-        // Then the output is empty
-        assertThat(errOut.toString(), emptyString());
-
-        // When start flow
-        flow.start(Flowable.empty());
+                .start();
 
         // Then output is empty because print was used before the call
         assertThat(errOut.toString(), emptyString());
@@ -155,18 +161,12 @@ class FlowTest {
         // Given
         bindAnswers("no"); // we don't care about answer in this test
 
-        // When build flow
-        Flow<Object, String> flow = askQuestion()
+        // When build flow and start
+        askQuestion()
                 .then(Flows.fromCall(new ThrowingStrCall(), StrCallInput::new))
                 .exceptionHandler(new TestExceptionHandler())
                 .print()
-                .build();
-
-        // Then the output is empty
-        assertThat(errOut.toString(), emptyString());
-
-        // When start flow
-        flow.start(Flowable.empty());
+                .start();
 
         // Then output equals to the message from the exception because we use TestExceptionHandler
         assertThat(errOut.toString(), equalTo("Ooops!" + System.lineSeparator()));
@@ -177,22 +177,15 @@ class FlowTest {
         // Given
         bindAnswers("no");
 
-        // When build flow
-        Flow<Object, String> flow = askQuestion()
+        // When build flow and start
+        askQuestion()
                 .print()
                 .print()
-                .build();
-
-        // Then the output is empty
-        assertThat(out.toString(), emptyString());
-        assertThat(errOut.toString(), emptyString());
-
-        // When start flow
-        flow.start(Flowable.empty());
+                .start();
 
         // Then output equals to 2 messages from print operations
         assertThat(out.toString(), equalTo("2" + System.lineSeparator()
-                        + "2" + System.lineSeparator()));
+                + "2" + System.lineSeparator()));
         assertThat(errOut.toString(), emptyString());
     }
 
@@ -201,25 +194,18 @@ class FlowTest {
         // Given
         bindAnswers("no");
 
-        // When build flow
-        Flow<Object, String> flow = askQuestion()
+        // When build flow and start
+        askQuestion()
                 .then(Flows.fromCall(new ThrowingStrCall(), StrCallInput::new))
                 .exceptionHandler(new TestExceptionHandler())
                 .print()
                 .print()
-                .build();
-
-        // Then the output is empty
-        assertThat(out.toString(), emptyString());
-        assertThat(errOut.toString(), emptyString());
-
-        // When start flow
-        flow.start(Flowable.empty());
+                .start();
 
         // Then error output equals to 2 messages from exception handler
         assertThat(out.toString(), emptyString());
         assertThat(errOut.toString(), equalTo("Ooops!" + System.lineSeparator()
-                        + "Ooops!" + System.lineSeparator()));
+                + "Ooops!" + System.lineSeparator()));
     }
 
     @Test
@@ -269,24 +255,42 @@ class FlowTest {
         assertThat(errOut.toString(), emptyString());
     }
 
-    private static Flow<Object, Integer> createFlow() {
-        return askQuestion()
-                .question(s -> "Here is your number " + s + ":, would you like to multiply it by 2?",
-                        List.of(new QuestionAnswer<>("yes"::equals, (a, i) -> Integer.parseInt(i) * 2),
-                                new QuestionAnswer<>("no"::equals, (a, i) -> Integer.parseInt(i))))
-                .ifThen(num -> num == 1, Flows.fromCall(new IntCall(), IntCallInput::new))
-                .ifThen(num -> num > 1, Flows.fromCall(new StrCall(), integer -> new StrCallInput(String.valueOf(integer))))
-                .build();
+    @Test
+    void flatMap() {
+        Flows.from("fizz")
+                .flatMap(v -> Flows.from(it -> it + "buzz"))
+                .print()
+                .start();
+        assertThat(out.toString(), equalTo("fizzbuzz" + System.lineSeparator()));
     }
 
-    private static FlowBuilder<Object, String> askQuestion() {
-        return Flows.question("Do you like this?",
-                        List.of(new QuestionAnswer<>("yes"::equals, (a, i) -> 1),
-                                new QuestionAnswer<>("no"::equals, (a, i) -> 2))
-                )
-                .map(String::valueOf);
+    @Test
+    void interruptFlatMap() {
+        Flows.from("fizz")
+                .map(it -> it + "1")
+                .print()
+                .flatMap(v -> Flows.<String, String>from(ignored -> Flowable.interrupt()))
+                .print()
+                .map(it -> it + "2")
+                .print()
+                .start();
+        assertThat(out.toString(), equalTo("fizz1" + System.lineSeparator()));
+    }
+
+    @Test
+    void interruptThen() {
+        Flows.from("fizz")
+                .map(it -> it + "1")
+                .print()
+                .then(v -> Flowable.interrupt())
+                .print()
+                .map(it -> it + "2")
+                .print()
+                .start();
+        assertThat(out.toString(), equalTo("fizz1" + System.lineSeparator()));
     }
 
+
     private void bindAnswers(String... answers) throws IOException {
         Files.writeString(input, String.join("\n", answers) + "\n");
     }