You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/08/10 20:43:02 UTC
[kafka] branch 3.0 updated: MINOR: update the branch(split) doc and
java doc and tests (#11195)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 85359ee MINOR: update the branch(split) doc and java doc and tests (#11195)
85359ee is described below
commit 85359ee57ba41a3b4ab5e00393ae5e67d7a5a325
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Wed Aug 11 04:37:59 2021 +0800
MINOR: update the branch(split) doc and java doc and tests (#11195)
Reviewers: Ivan Ponomarev <ip...@mail.ru>, Matthias J. Sax <ma...@confluent.io>
---
docs/streams/developer-guide/dsl-api.html | 10 +++---
.../org/apache/kafka/streams/kstream/Branched.java | 1 +
.../kafka/streams/kstream/BranchedKStream.java | 7 +++--
.../org/apache/kafka/streams/kstream/KStream.java | 16 ++++++----
.../kstream/internals/KStreamSplitTest.java | 36 +++++++++++++---------
5 files changed, 41 insertions(+), 29 deletions(-)
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index ba3b3c8..45affd9 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -360,14 +360,14 @@ GlobalKTable<String, Long> wordCounts = builder.globalTable(
<tbody valign="top">
<tr class="row-even"><td><p class="first"><strong>Branch</strong></p>
<ul class="last simple">
- <li>KStream → KStream[]</li>
+ <li>KStream → BranchedKStream</li>
</ul>
</td>
<td><p class="first">Branch (or split) a <code class="docutils literal"><span class="pre">KStream</span></code> based on the supplied predicates into one or more <code class="docutils literal"><span class="pre">KStream</span></code> instances.
- (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#split">details</a>)</p>
+ (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#split()">details</a>)</p>
<p>Predicates are evaluated in order. A record is placed to one and only one output stream on the first match:
- if the n-th predicate evaluates to true, the record is placed to n-th stream. If no predicate matches, the
- the record is dropped.</p>
+ if the n-th predicate evaluates to true, the record is placed to n-th stream. If a record does not match any predicates,
+ it will be routed to the default branch, or dropped if no default branch is created.</p>
<p>Branching is useful, for example, to route records to different downstream topics.</p>
<pre class="line-numbers"><code class="language-java">KStream<String, Long> stream = ...;
Map<String, KStream<String, Long>> branches =
@@ -376,7 +376,7 @@ Map<String, KStream<String, Long>> branches =
Branched.as("A"))
.branch((key, value) -> key.startsWith("B"), /* second predicate */
Branched.as("B"))
-.defaultBranch(Branched.as("C"))
+ .defaultBranch(Branched.as("C")) /* default branch */
);
// KStream branches.get("Branch-A") contains all records whose keys start with "A"
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
index 713a510..3e85a73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
@@ -143,6 +143,7 @@ public class Branched<K, V> implements NamedOperation<Branched<K, V>> {
*/
@Override
public Branched<K, V> withName(final String name) {
+ Objects.requireNonNull(name, "name cannot be null");
return new Branched<>(name, chainFunction, chainConsumer);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
index 2370c60..2115170 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
@@ -22,7 +22,7 @@ import java.util.Map;
* Branches the records in the original stream based on the predicates supplied for the branch definitions.
* <p>
* Branches are defined with {@link BranchedKStream#branch(Predicate, Branched)} or
- * {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is evaluated against the predicates
+ * {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is evaluated against the {@code predicate}
* supplied via {@link Branched} parameters, and is routed to the first branch for which its respective predicate
* evaluates to {@code true}. If a record does not match any predicates, it will be routed to the default branch,
* or dropped if no default branch is created.
@@ -50,7 +50,7 @@ import java.util.Map;
* {@link Branched} parameter, its value is appended to the prefix to form the {@code Map} key
* <li>If a name is not provided for the branch, then the key defaults to {@code prefix + position} of the branch
* as a decimal number, starting from {@code "1"}
- * <li>If a name is not provided for the {@link BranchedKStream#defaultBranch()} call, then the key defaults
+ * <li>If a name is not provided for the {@link BranchedKStream#defaultBranch()}, then the key defaults
* to {@code prefix + "0"}
* </ul>
* The values of the respective {@code Map<Stream, KStream<K, V>>} entries are formed as following:
@@ -69,7 +69,8 @@ import java.util.Map;
* .branch(predicate1, Branched.as("bar")) // "foo-bar"
* .branch(predicate2, Branched.withConsumer(ks->ks.to("A")) // no entry: a Consumer is provided
* .branch(predicate3, Branched.withFunction(ks->null)) // no entry: chain function returns null
- * .branch(predicate4) // "foo-4": name defaults to the branch position
+ * .branch(predicate4, Branched.withFunction(ks->ks)) // "foo-4": chain function returns non-null value
+ * .branch(predicate5) // "foo-5": name defaults to the branch position
* .defaultBranch() // "foo-0": "0" is the default name for the default branch
* }</pre>
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index da7aa23..c2ec757 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -782,18 +782,22 @@ public interface KStream<K, V> {
KStream<K, V>[] branch(final Named named, final Predicate<? super K, ? super V>... predicates);
/**
- * Split this stream. {@link BranchedKStream} can be used for routing the records to different branches depending
- * on evaluation against the supplied predicates.
- * Stream branching is a stateless record-by-record operation.
+ * Split this stream into different branches. The returned {@link BranchedKStream} instance can be used for routing
+ * the records to different branches depending on evaluation against the supplied predicates.
+ * <p>
+ * Note: Stream branching is a stateless record-by-record operation.
+ * Please check {@link BranchedKStream} for detailed description and usage example
*
* @return {@link BranchedKStream} that provides methods for routing the records to different branches.
*/
BranchedKStream<K, V> split();
/**
- * Split this stream. {@link BranchedKStream} can be used for routing the records to different branches depending
- * on evaluation against the supplied predicates.
- * Stream branching is a stateless record-by-record operation.
+ * Split this stream into different branches. The returned {@link BranchedKStream} instance can be used for routing
+ * the records to different branches depending on evaluation against the supplied predicates.
+ * <p>
+ * Note: Stream branching is a stateless record-by-record operation.
+ * Please check {@link BranchedKStream} for detailed description and usage example
*
* @param named a {@link Named} config used to name the processor in the topology and also to set the name prefix
* for the resulting branches (see {@link BranchedKStream})
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java
index 4b2e684..29eaf1a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java
@@ -50,6 +50,7 @@ public class KStreamSplitTest {
private final Predicate<Integer, String> isMultipleOfThree = (key, value) -> (key % 3) == 0;
private final Predicate<Integer, String> isMultipleOfFive = (key, value) -> (key % 5) == 0;
private final Predicate<Integer, String> isMultipleOfSeven = (key, value) -> (key % 7) == 0;
+ private final Predicate<Integer, String> isNegative = (key, value) -> key < 0;
private final KStream<Integer, String> source = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
@Test
@@ -68,14 +69,14 @@ public class KStreamSplitTest {
final TestOutputTopic<Integer, String> x2 = driver.createOutputTopic("x2", new IntegerDeserializer(), new StringDeserializer());
final TestOutputTopic<Integer, String> x3 = driver.createOutputTopic("x3", new IntegerDeserializer(), new StringDeserializer());
final TestOutputTopic<Integer, String> x5 = driver.createOutputTopic("x5", new IntegerDeserializer(), new StringDeserializer());
- assertEquals(Arrays.asList("V2", "V4", "V6"), x2.readValuesToList());
+ assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), x2.readValuesToList());
assertEquals(Arrays.asList("V3"), x3.readValuesToList());
assertEquals(Arrays.asList("V5"), x5.readValuesToList());
});
}
private void withDriver(final Consumer<TopologyTestDriver> test) {
- final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
+ final int[] expectedKeys = new int[]{-1, 0, 1, 2, 3, 4, 5, 6, 7};
final Topology topology = builder.build();
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
final TestInputTopic<Integer, String> inputTopic = driver.createInputTopic(topicName, new IntegerSerializer(), new StringSerializer());
@@ -104,25 +105,29 @@ public class KStreamSplitTest {
// "foo-bar"
.branch(isEven, Branched.as("bar"))
// no entry: a Consumer is provided
- .branch(isMultipleOfThree, Branched.withConsumer(ks -> {
- }))
+ .branch(isMultipleOfThree, Branched.withConsumer(ks -> { }))
// no entry: chain function returns null
.branch(isMultipleOfFive, Branched.withFunction(ks -> null))
- // "foo-4": name defaults to the branch position
+ // "foo-4": chain function returns non-null value
+ .branch(isNegative, Branched.withFunction(ks -> ks))
+ // "foo-5": name defaults to the branch position
.branch(isMultipleOfSeven)
// "foo-0": "0" is the default name for the default branch
.defaultBranch();
- assertEquals(3, branches.size());
- branches.get("foo-bar").to("foo-bar");
- branches.get("foo-4").to("foo-4");
- branches.get("foo-0").to("foo-0");
+ assertEquals(4, branches.size());
+ // direct the branched streams into different topics named with branch name
+ for (final Map.Entry<String, KStream<Integer, String>> branch: branches.entrySet()) {
+ branch.getValue().to(branch.getKey());
+ }
builder.build();
withDriver(driver -> {
final TestOutputTopic<Integer, String> even = driver.createOutputTopic("foo-bar", new IntegerDeserializer(), new StringDeserializer());
- final TestOutputTopic<Integer, String> x7 = driver.createOutputTopic("foo-4", new IntegerDeserializer(), new StringDeserializer());
+ final TestOutputTopic<Integer, String> negative = driver.createOutputTopic("foo-4", new IntegerDeserializer(), new StringDeserializer());
+ final TestOutputTopic<Integer, String> x7 = driver.createOutputTopic("foo-5", new IntegerDeserializer(), new StringDeserializer());
final TestOutputTopic<Integer, String> defaultBranch = driver.createOutputTopic("foo-0", new IntegerDeserializer(), new StringDeserializer());
- assertEquals(Arrays.asList("V2", "V4", "V6"), even.readValuesToList());
+ assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), even.readValuesToList());
+ assertEquals(Arrays.asList("V-1"), negative.readValuesToList());
assertEquals(Arrays.asList("V7"), x7.readValuesToList());
assertEquals(Arrays.asList("V1"), defaultBranch.readValuesToList());
});
@@ -130,14 +135,15 @@ public class KStreamSplitTest {
@Test
public void testBranchingWithNoTerminalOperation() {
+ final String outputTopicName = "output";
source.split()
- .branch(isEven, Branched.withConsumer(ks -> ks.to("output")))
- .branch(isMultipleOfFive, Branched.withConsumer(ks -> ks.to("output")));
+ .branch(isEven, Branched.withConsumer(ks -> ks.to(outputTopicName)))
+ .branch(isMultipleOfFive, Branched.withConsumer(ks -> ks.to(outputTopicName)));
builder.build();
withDriver(driver -> {
final TestOutputTopic<Integer, String> outputTopic =
- driver.createOutputTopic("output", new IntegerDeserializer(), new StringDeserializer());
- assertEquals(Arrays.asList("V2", "V4", "V5", "V6"), outputTopic.readValuesToList());
+ driver.createOutputTopic(outputTopicName, new IntegerDeserializer(), new StringDeserializer());
+ assertEquals(Arrays.asList("V0", "V2", "V4", "V5", "V6"), outputTopic.readValuesToList());
});
}
}