You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/10/12 08:51:46 UTC
[ignite-3] branch main updated: IGNITE-17553: Add flatMap function to Flow framework (#1180)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 719904fe7a IGNITE-17553: Add flatMap function to Flow framework (#1180)
719904fe7a is described below
commit 719904fe7afd0f0d575e4f4b1ba56364a47f2b3f
Author: Ivan Gagarkin <ga...@gmail.com>
AuthorDate: Wed Oct 12 11:51:41 2022 +0300
IGNITE-17553: Add flatMap function to Flow framework (#1180)
---
.../internal/cli/call/connect/ConnectCall.java | 12 +-
.../questions/ConnectToClusterQuestion.java | 23 ++--
.../apache/ignite/internal/cli/core/flow/Flow.java | 2 +-
.../ignite/internal/cli/core/flow/Flowable.java | 2 +
.../cli/core/flow/builder/FlowBuilder.java | 10 ++
.../cli/core/flow/builder/FlowBuilderImpl.java | 17 ++-
.../internal/cli/core/flow/builder/Flows.java | 7 ++
.../internal/cli/commands/flow/FlowTest.java | 130 +++++++++++----------
8 files changed, 115 insertions(+), 88 deletions(-)
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/ConnectCall.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/ConnectCall.java
index cd65c5e27d..ceffeec1e8 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/ConnectCall.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/ConnectCall.java
@@ -55,10 +55,10 @@ public class ConnectCall implements Call<ConnectCallInput, String> {
public CallOutput<String> execute(ConnectCallInput input) {
try {
String nodeUrl = input.getNodeUrl();
+ String configuration = fetchNodeConfiguration(nodeUrl);
+ session.setNodeName(fetchNodeName(nodeUrl));
session.setNodeUrl(nodeUrl);
stateConfigProvider.get().setProperty(ConfigConstants.LAST_CONNECTED_URL, nodeUrl);
- session.setNodeName(fetchNodeName(input));
- String configuration = fetchNodeConfiguration(input);
session.setJdbcUrl(constructJdbcUrl(configuration, nodeUrl));
session.setConnectedToNode(true);
@@ -70,12 +70,12 @@ public class ConnectCall implements Call<ConnectCallInput, String> {
}
}
- private String fetchNodeName(ConnectCallInput input) throws ApiException {
- return new NodeManagementApi(Configuration.getDefaultApiClient().setBasePath(input.getNodeUrl())).nodeState().getName();
+ private String fetchNodeName(String nodeUrl) throws ApiException {
+ return new NodeManagementApi(Configuration.getDefaultApiClient().setBasePath(nodeUrl)).nodeState().getName();
}
- private String fetchNodeConfiguration(ConnectCallInput input) throws ApiException {
- return new NodeConfigurationApi(Configuration.getDefaultApiClient().setBasePath(input.getNodeUrl())).getNodeConfiguration();
+ private String fetchNodeConfiguration(String nodeUrl) throws ApiException {
+ return new NodeConfigurationApi(Configuration.getDefaultApiClient().setBasePath(nodeUrl)).getNodeConfiguration();
}
private String constructJdbcUrl(String configuration, String nodeUrl) {
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/questions/ConnectToClusterQuestion.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/questions/ConnectToClusterQuestion.java
index d8bb358c8d..7e0b8f1194 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/questions/ConnectToClusterQuestion.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/questions/ConnectToClusterQuestion.java
@@ -25,7 +25,6 @@ import org.apache.ignite.internal.cli.call.connect.ConnectCallInput;
import org.apache.ignite.internal.cli.config.ConfigConstants;
import org.apache.ignite.internal.cli.config.ConfigManagerProvider;
import org.apache.ignite.internal.cli.config.StateConfigProvider;
-import org.apache.ignite.internal.cli.core.flow.Flowable;
import org.apache.ignite.internal.cli.core.flow.builder.FlowBuilder;
import org.apache.ignite.internal.cli.core.flow.builder.Flows;
import org.apache.ignite.internal.cli.core.repl.Session;
@@ -66,16 +65,16 @@ public class ConnectToClusterQuestion {
);
return Flows.from(clusterUrlOrSessionNode(clusterUrl))
- .ifThen(Objects::isNull, Flows.<String, ConnectCallInput>acceptQuestion(questionUiComponent,
- () -> new ConnectCallInput(defaultUrl))
- .then(Flows.fromCall(connectCall))
- .print()
- .build())
- .then(prevUrl -> {
- // If inner flow from ifThen is interrupted we should interrupt outer flow as well.
- // TODO https://issues.apache.org/jira/browse/IGNITE-17553
- String url = clusterUrlOrSessionNode(clusterUrl);
- return url != null ? Flowable.success(url) : Flowable.interrupt();
+ .flatMap(v -> {
+ if (Objects.isNull(v)) {
+ return Flows.<String, ConnectCallInput>acceptQuestion(questionUiComponent,
+ () -> new ConnectCallInput(defaultUrl))
+ .then(Flows.fromCall(connectCall))
+ .print()
+ .map(ignored -> clusterUrlOrSessionNode(clusterUrl));
+ } else {
+ return Flows.identity();
+ }
});
}
@@ -110,7 +109,7 @@ public class ConnectToClusterQuestion {
.print()
.ifThen(s -> !Objects.equals(clusterUrl, defaultUrl) && session.isConnectedToNode(),
defaultUrlQuestion(clusterUrl).print().build())
- .build().start(Flowable.empty());
+ .start();
}
private FlowBuilder<String, String> defaultUrlQuestion(String lastConnectedUrl) {
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flow.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flow.java
index 90a94b55a7..586b1c2b81 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flow.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flow.java
@@ -36,7 +36,7 @@ public interface Flow<I, O> {
/**
* Flow composition method.
*
- * @param next flow which will be executed after currecnt instance with result of it.
+ * @param next flow which will be executed after current instance with result of it.
* @param <OT> new output type.
* @return output flowable of {@param next}
*/
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flowable.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flowable.java
index 5aa59a4872..cd364f4b71 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flowable.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/Flowable.java
@@ -81,6 +81,8 @@ public interface Flowable<T> {
static <T> Flowable<T> process(Supplier<T> supplier) {
try {
return success(supplier.get());
+ } catch (FlowInterruptException e) {
+ throw e;
} catch (Exception e) {
return failure(e);
}
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java
index a1e3b41b9b..f55c9d7099 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java
@@ -54,6 +54,16 @@ public interface FlowBuilder<I, O> {
return then(Flows.mono(mapper));
}
+ /**
+ * Returns a {@link FlowBuilder} consisting of the results of replacing each element
+ * of this flow with the contents of a mapped flow produced
+ * by applying the provided mapping function to each element.
+ *
+ * @param mapper function to apply to each element which produces a flow of new values
+ * @return the new {@link FlowBuilder}
+ */
+ <OT> FlowBuilder<I, OT> flatMap(Function<O, FlowBuilder<O, OT>> mapper);
+
/**
* Appends the flow to this builder if the result of the current flow matches the predicate.
*
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java
index 1d5e60ef86..ab66313ef3 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java
@@ -69,6 +69,11 @@ public class FlowBuilderImpl<I, O> implements FlowBuilder<I, O> {
return new FlowBuilderImpl<>(this.flow.composite(flow), exceptionHandlers, decoratorRegistry);
}
+ @Override
+ public <OT> FlowBuilder<I, OT> flatMap(Function<O, FlowBuilder<O, OT>> mapper) {
+ return then(it -> mapper.apply(it.value()).build().start(it));
+ }
+
@Override
public <OT> FlowBuilder<I, O> ifThen(Predicate<O> tester, Flow<O, OT> flow) {
return then(input -> {
@@ -114,7 +119,11 @@ public class FlowBuilderImpl<I, O> implements FlowBuilder<I, O> {
@Override
public void start() {
- run(Flowable.empty());
+ try {
+ run(Flowable.empty());
+ } catch (FlowInterruptException ignored) {
+ // FlowInterruptException is an internal exception and shouldn't be exposed to users
+ }
}
/**
@@ -124,11 +133,7 @@ public class FlowBuilderImpl<I, O> implements FlowBuilder<I, O> {
* @return output flowable
*/
private Flowable<O> run(Flowable<I> input) {
- try {
- return flow.start(input);
- } catch (FlowInterruptException e) {
- return Flowable.empty();
- }
+ return flow.start(input);
}
/**
diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/Flows.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/Flows.java
index 0a1abfdbb6..2f98bfe9c2 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/Flows.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/Flows.java
@@ -153,4 +153,11 @@ public final class Flows {
public static <I, O> FlowBuilder<I, O> acceptQuestion(QuestionUiComponent question, Supplier<O> onAccept) {
return acceptQuestion(question.render(), onAccept);
}
+
+ /**
+ * Returns a {@link FlowBuilder} that always returns its input argument.
+ */
+ public static <I> FlowBuilder<I, I> identity() {
+ return new FlowBuilderImpl<>(input -> Flowable.success(input.value()));
+ }
}
diff --git a/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java b/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java
index ec554ad770..2267d6ac20 100644
--- a/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java
+++ b/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java
@@ -53,6 +53,24 @@ class FlowTest {
private StringWriter out;
private StringWriter errOut;
+ private static Flow<Object, Integer> createFlow() {
+ return askQuestion()
+ .question(s -> "Here is your number " + s + ":, would you like to multiply it by 2?",
+ List.of(new QuestionAnswer<>("yes"::equals, (a, i) -> Integer.parseInt(i) * 2),
+ new QuestionAnswer<>("no"::equals, (a, i) -> Integer.parseInt(i))))
+ .ifThen(num -> num == 1, Flows.fromCall(new IntCall(), IntCallInput::new))
+ .ifThen(num -> num > 1, Flows.fromCall(new StrCall(), integer -> new StrCallInput(String.valueOf(integer))))
+ .build();
+ }
+
+ private static FlowBuilder<Object, String> askQuestion() {
+ return Flows.question("Do you like this?",
+ List.of(new QuestionAnswer<>("yes"::equals, (a, i) -> 1),
+ new QuestionAnswer<>("no"::equals, (a, i) -> 2))
+ )
+ .map(String::valueOf);
+ }
+
@BeforeEach
public void setup() throws IOException {
input = Files.createTempFile("input", "");
@@ -109,18 +127,12 @@ class FlowTest {
// Given
bindAnswers("no"); // we don't care about answer in this test
- // When build flow
- Flow<Object, String> flow = askQuestion()
+ // When build flow and start
+ askQuestion()
.exceptionHandler(new TestExceptionHandler())
.then(Flows.fromCall(new ThrowingStrCall(), StrCallInput::new))
.print()
- .build();
-
- // Then the output is empty
- assertThat(errOut.toString(), emptyString());
-
- // When start flow
- flow.start(Flowable.empty());
+ .start();
// Then output equals to the message from the exception because we use TestExceptionHandler
assertThat(errOut.toString(), equalTo("Ooops!" + System.lineSeparator()));
@@ -132,18 +144,12 @@ class FlowTest {
// Given
bindAnswers("no"); // we don't care about answer in this test
- // When build flow
- Flow<Object, String> flow = askQuestion()
+ // When build flow and start
+ askQuestion()
.print()
.then(Flows.fromCall(new ThrowingStrCall(), StrCallInput::new))
.exceptionHandler(new TestExceptionHandler())
- .build();
-
- // Then the output is empty
- assertThat(errOut.toString(), emptyString());
-
- // When start flow
- flow.start(Flowable.empty());
+ .start();
// Then output is empty because print was used before the call
assertThat(errOut.toString(), emptyString());
@@ -155,18 +161,12 @@ class FlowTest {
// Given
bindAnswers("no"); // we don't care about answer in this test
- // When build flow
- Flow<Object, String> flow = askQuestion()
+ // When build flow and start
+ askQuestion()
.then(Flows.fromCall(new ThrowingStrCall(), StrCallInput::new))
.exceptionHandler(new TestExceptionHandler())
.print()
- .build();
-
- // Then the output is empty
- assertThat(errOut.toString(), emptyString());
-
- // When start flow
- flow.start(Flowable.empty());
+ .start();
// Then output equals to the message from the exception because we use TestExceptionHandler
assertThat(errOut.toString(), equalTo("Ooops!" + System.lineSeparator()));
@@ -177,22 +177,15 @@ class FlowTest {
// Given
bindAnswers("no");
- // When build flow
- Flow<Object, String> flow = askQuestion()
+ // When build flow and start
+ askQuestion()
.print()
.print()
- .build();
-
- // Then the output is empty
- assertThat(out.toString(), emptyString());
- assertThat(errOut.toString(), emptyString());
-
- // When start flow
- flow.start(Flowable.empty());
+ .start();
// Then output equals to 2 messages from print operations
assertThat(out.toString(), equalTo("2" + System.lineSeparator()
- + "2" + System.lineSeparator()));
+ + "2" + System.lineSeparator()));
assertThat(errOut.toString(), emptyString());
}
@@ -201,25 +194,18 @@ class FlowTest {
// Given
bindAnswers("no");
- // When build flow
- Flow<Object, String> flow = askQuestion()
+ // When build flow and start
+ askQuestion()
.then(Flows.fromCall(new ThrowingStrCall(), StrCallInput::new))
.exceptionHandler(new TestExceptionHandler())
.print()
.print()
- .build();
-
- // Then the output is empty
- assertThat(out.toString(), emptyString());
- assertThat(errOut.toString(), emptyString());
-
- // When start flow
- flow.start(Flowable.empty());
+ .start();
// Then error output equals to 2 messages from exception handler
assertThat(out.toString(), emptyString());
assertThat(errOut.toString(), equalTo("Ooops!" + System.lineSeparator()
- + "Ooops!" + System.lineSeparator()));
+ + "Ooops!" + System.lineSeparator()));
}
@Test
@@ -269,24 +255,42 @@ class FlowTest {
assertThat(errOut.toString(), emptyString());
}
- private static Flow<Object, Integer> createFlow() {
- return askQuestion()
- .question(s -> "Here is your number " + s + ":, would you like to multiply it by 2?",
- List.of(new QuestionAnswer<>("yes"::equals, (a, i) -> Integer.parseInt(i) * 2),
- new QuestionAnswer<>("no"::equals, (a, i) -> Integer.parseInt(i))))
- .ifThen(num -> num == 1, Flows.fromCall(new IntCall(), IntCallInput::new))
- .ifThen(num -> num > 1, Flows.fromCall(new StrCall(), integer -> new StrCallInput(String.valueOf(integer))))
- .build();
+ @Test
+ void flatMap() {
+ Flows.from("fizz")
+ .flatMap(v -> Flows.from(it -> it + "buzz"))
+ .print()
+ .start();
+ assertThat(out.toString(), equalTo("fizzbuzz" + System.lineSeparator()));
}
- private static FlowBuilder<Object, String> askQuestion() {
- return Flows.question("Do you like this?",
- List.of(new QuestionAnswer<>("yes"::equals, (a, i) -> 1),
- new QuestionAnswer<>("no"::equals, (a, i) -> 2))
- )
- .map(String::valueOf);
+ @Test
+ void interruptFlatMap() {
+ Flows.from("fizz")
+ .map(it -> it + "1")
+ .print()
+ .flatMap(v -> Flows.<String, String>from(ignored -> Flowable.interrupt()))
+ .print()
+ .map(it -> it + "2")
+ .print()
+ .start();
+ assertThat(out.toString(), equalTo("fizz1" + System.lineSeparator()));
+ }
+
+ @Test
+ void interruptThen() {
+ Flows.from("fizz")
+ .map(it -> it + "1")
+ .print()
+ .then(v -> Flowable.interrupt())
+ .print()
+ .map(it -> it + "2")
+ .print()
+ .start();
+ assertThat(out.toString(), equalTo("fizz1" + System.lineSeparator()));
}
+
private void bindAnswers(String... answers) throws IOException {
Files.writeString(input, String.join("\n", answers) + "\n");
}