You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/08/10 20:43:02 UTC

[kafka] branch 3.0 updated: MINOR: update the branch(split) doc and java doc and tests (#11195)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 85359ee  MINOR: update the branch(split) doc and java doc and tests (#11195)
85359ee is described below

commit 85359ee57ba41a3b4ab5e00393ae5e67d7a5a325
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Wed Aug 11 04:37:59 2021 +0800

    MINOR: update the branch(split) doc and java doc and tests (#11195)
    
    Reviewers: Ivan Ponomarev <ip...@mail.ru>, Matthias J. Sax <ma...@confluent.io>
---
 docs/streams/developer-guide/dsl-api.html          | 10 +++---
 .../org/apache/kafka/streams/kstream/Branched.java |  1 +
 .../kafka/streams/kstream/BranchedKStream.java     |  7 +++--
 .../org/apache/kafka/streams/kstream/KStream.java  | 16 ++++++----
 .../kstream/internals/KStreamSplitTest.java        | 36 +++++++++++++---------
 5 files changed, 41 insertions(+), 29 deletions(-)

diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index ba3b3c8..45affd9 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -360,14 +360,14 @@ GlobalKTable&lt;String, Long&gt; wordCounts = builder.globalTable(
                     <tbody valign="top">
                     <tr class="row-even"><td><p class="first"><strong>Branch</strong></p>
                         <ul class="last simple">
-                            <li>KStream &rarr; KStream[]</li>
+                            <li>KStream &rarr; BranchedKStream</li>
                         </ul>
                     </td>
                         <td><p class="first">Branch (or split) a <code class="docutils literal"><span class="pre">KStream</span></code> based on the supplied predicates into one or more <code class="docutils literal"><span class="pre">KStream</span></code> instances.
-                            (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#split">details</a>)</p>
+                            (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#split()">details</a>)</p>
                             <p>Predicates are evaluated in order.  A record is placed to one and only one output stream on the first match:
-                                if the n-th predicate evaluates to true, the record is placed to n-th stream.  If no predicate matches, the
-                                the record is dropped.</p>
+                                if the n-th predicate evaluates to true, the record is placed to n-th stream. If a record does not match any predicates,
+                                it will be routed to the default branch, or dropped if no default branch is created.</p>
                             <p>Branching is useful, for example, to route records to different downstream topics.</p>
                             <pre class="line-numbers"><code class="language-java">KStream&lt;String, Long&gt; stream = ...;
 Map&lt;String, KStream&lt;String, Long&gt;&gt; branches =
@@ -376,7 +376,7 @@ Map&lt;String, KStream&lt;String, Long&gt;&gt; branches =
              Branched.as(&quot;A&quot;))
         .branch((key, value) -&gt; key.startsWith(&quot;B&quot;),  /* second predicate */
              Branched.as(&quot;B&quot;))
-.defaultBranch(Branched.as(&quot;C&quot;))
+        .defaultBranch(Branched.as(&quot;C&quot;))              /* default branch */
 );
 
 // KStream branches.get(&quot;Branch-A&quot;) contains all records whose keys start with &quot;A&quot;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
index 713a510..3e85a73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
@@ -143,6 +143,7 @@ public class Branched<K, V> implements NamedOperation<Branched<K, V>> {
      */
     @Override
     public Branched<K, V> withName(final String name) {
+        Objects.requireNonNull(name, "name cannot be null");
         return new Branched<>(name, chainFunction, chainConsumer);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
index 2370c60..2115170 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
@@ -22,7 +22,7 @@ import java.util.Map;
  * Branches the records in the original stream based on the predicates supplied for the branch definitions.
  * <p>
  * Branches are defined with {@link BranchedKStream#branch(Predicate, Branched)} or
- * {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is evaluated against the predicates
+ * {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is evaluated against the {@code predicate}
  * supplied via {@link Branched} parameters, and is routed to the first branch for which its respective predicate
  * evaluates to {@code true}. If a record does not match any predicates, it will be routed to the default branch,
  * or dropped if no default branch is created.
@@ -50,7 +50,7 @@ import java.util.Map;
  *     {@link Branched} parameter, its value is appended to the prefix to form the {@code Map} key
  *     <li>If a name is not provided for the branch, then the key defaults to {@code prefix + position} of the branch
  *     as a decimal number, starting from {@code "1"}
- *     <li>If a name is not provided for the {@link BranchedKStream#defaultBranch()} call, then the key defaults
+ *     <li>If a name is not provided for the {@link BranchedKStream#defaultBranch()}, then the key defaults
  *     to {@code prefix + "0"}
  * </ul>
  * The values of the respective {@code Map<Stream, KStream<K, V>>} entries are formed as following:
@@ -69,7 +69,8 @@ import java.util.Map;
  *     .branch(predicate1, Branched.as("bar"))                    // "foo-bar"
  *     .branch(predicate2, Branched.withConsumer(ks->ks.to("A"))  // no entry: a Consumer is provided
  *     .branch(predicate3, Branched.withFunction(ks->null))       // no entry: chain function returns null
- *     .branch(predicate4)                                        // "foo-4": name defaults to the branch position
+ *     .branch(predicate4, Branched.withFunction(ks->ks))         // "foo-4": chain function returns non-null value
+ *     .branch(predicate5)                                        // "foo-5": name defaults to the branch position
  *     .defaultBranch()                                           // "foo-0": "0" is the default name for the default branch
  * }</pre>
  *
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index da7aa23..c2ec757 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -782,18 +782,22 @@ public interface KStream<K, V> {
     KStream<K, V>[] branch(final Named named, final Predicate<? super K, ? super V>... predicates);
 
     /**
-     * Split this stream. {@link BranchedKStream} can be used for routing the records to different branches depending
-     * on evaluation against the supplied predicates.
-     * Stream branching is a stateless record-by-record operation.
+     * Split this stream into different branches. The returned {@link BranchedKStream} instance can be used for routing
+     * the records to different branches depending on evaluation against the supplied predicates.
+     * <p>
+     *     Note: Stream branching is a stateless record-by-record operation.
+     *     Please check {@link BranchedKStream} for detailed description and usage example
      *
      * @return {@link BranchedKStream} that provides methods for routing the records to different branches.
      */
     BranchedKStream<K, V> split();
 
     /**
-     * Split this stream. {@link BranchedKStream} can be used for routing the records to different branches depending
-     * on evaluation against the supplied predicates.
-     * Stream branching is a stateless record-by-record operation.
+     * Split this stream into different branches. The returned {@link BranchedKStream} instance can be used for routing
+     * the records to different branches depending on evaluation against the supplied predicates.
+     * <p>
+     *     Note: Stream branching is a stateless record-by-record operation.
+     *     Please check {@link BranchedKStream} for detailed description and usage example
      *
      * @param named  a {@link Named} config used to name the processor in the topology and also to set the name prefix
      *               for the resulting branches (see {@link BranchedKStream})
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java
index 4b2e684..29eaf1a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java
@@ -50,6 +50,7 @@ public class KStreamSplitTest {
     private final Predicate<Integer, String> isMultipleOfThree = (key, value) -> (key % 3) == 0;
     private final Predicate<Integer, String> isMultipleOfFive = (key, value) -> (key % 5) == 0;
     private final Predicate<Integer, String> isMultipleOfSeven = (key, value) -> (key % 7) == 0;
+    private final Predicate<Integer, String> isNegative = (key, value) -> key < 0;
     private final KStream<Integer, String> source = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
 
     @Test
@@ -68,14 +69,14 @@ public class KStreamSplitTest {
             final TestOutputTopic<Integer, String> x2 = driver.createOutputTopic("x2", new IntegerDeserializer(), new StringDeserializer());
             final TestOutputTopic<Integer, String> x3 = driver.createOutputTopic("x3", new IntegerDeserializer(), new StringDeserializer());
             final TestOutputTopic<Integer, String> x5 = driver.createOutputTopic("x5", new IntegerDeserializer(), new StringDeserializer());
-            assertEquals(Arrays.asList("V2", "V4", "V6"), x2.readValuesToList());
+            assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), x2.readValuesToList());
             assertEquals(Arrays.asList("V3"), x3.readValuesToList());
             assertEquals(Arrays.asList("V5"), x5.readValuesToList());
         });
     }
 
     private void withDriver(final Consumer<TopologyTestDriver> test) {
-        final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
+        final int[] expectedKeys = new int[]{-1, 0, 1, 2, 3, 4, 5, 6, 7};
         final Topology topology = builder.build();
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
             final TestInputTopic<Integer, String> inputTopic = driver.createInputTopic(topicName, new IntegerSerializer(), new StringSerializer());
@@ -104,25 +105,29 @@ public class KStreamSplitTest {
                         // "foo-bar"
                         .branch(isEven, Branched.as("bar"))
                         // no entry: a Consumer is provided
-                        .branch(isMultipleOfThree, Branched.withConsumer(ks -> {
-                        }))
+                        .branch(isMultipleOfThree, Branched.withConsumer(ks -> { }))
                         // no entry: chain function returns null
                         .branch(isMultipleOfFive, Branched.withFunction(ks -> null))
-                        // "foo-4": name defaults to the branch position
+                        // "foo-4": chain function returns non-null value
+                        .branch(isNegative, Branched.withFunction(ks -> ks))
+                        // "foo-5": name defaults to the branch position
                         .branch(isMultipleOfSeven)
                         // "foo-0": "0" is the default name for the default branch
                         .defaultBranch();
-        assertEquals(3, branches.size());
-        branches.get("foo-bar").to("foo-bar");
-        branches.get("foo-4").to("foo-4");
-        branches.get("foo-0").to("foo-0");
+        assertEquals(4, branches.size());
+        // direct the branched streams into different topics named with branch name
+        for (final Map.Entry<String, KStream<Integer, String>> branch: branches.entrySet()) {
+            branch.getValue().to(branch.getKey());
+        }
         builder.build();
 
         withDriver(driver -> {
             final TestOutputTopic<Integer, String> even = driver.createOutputTopic("foo-bar", new IntegerDeserializer(), new StringDeserializer());
-            final TestOutputTopic<Integer, String> x7 = driver.createOutputTopic("foo-4", new IntegerDeserializer(), new StringDeserializer());
+            final TestOutputTopic<Integer, String> negative = driver.createOutputTopic("foo-4", new IntegerDeserializer(), new StringDeserializer());
+            final TestOutputTopic<Integer, String> x7 = driver.createOutputTopic("foo-5", new IntegerDeserializer(), new StringDeserializer());
             final TestOutputTopic<Integer, String> defaultBranch = driver.createOutputTopic("foo-0", new IntegerDeserializer(), new StringDeserializer());
-            assertEquals(Arrays.asList("V2", "V4", "V6"), even.readValuesToList());
+            assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), even.readValuesToList());
+            assertEquals(Arrays.asList("V-1"), negative.readValuesToList());
             assertEquals(Arrays.asList("V7"), x7.readValuesToList());
             assertEquals(Arrays.asList("V1"), defaultBranch.readValuesToList());
         });
@@ -130,14 +135,15 @@ public class KStreamSplitTest {
 
     @Test
     public void testBranchingWithNoTerminalOperation() {
+        final String outputTopicName = "output";
         source.split()
-                .branch(isEven, Branched.withConsumer(ks -> ks.to("output")))
-                .branch(isMultipleOfFive, Branched.withConsumer(ks -> ks.to("output")));
+                .branch(isEven, Branched.withConsumer(ks -> ks.to(outputTopicName)))
+                .branch(isMultipleOfFive, Branched.withConsumer(ks -> ks.to(outputTopicName)));
         builder.build();
         withDriver(driver -> {
             final TestOutputTopic<Integer, String> outputTopic =
-                    driver.createOutputTopic("output", new IntegerDeserializer(), new StringDeserializer());
-            assertEquals(Arrays.asList("V2", "V4", "V5", "V6"), outputTopic.readValuesToList());
+                    driver.createOutputTopic(outputTopicName, new IntegerDeserializer(), new StringDeserializer());
+            assertEquals(Arrays.asList("V0", "V2", "V4", "V5", "V6"), outputTopic.readValuesToList());
         });
     }
 }