You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/17 22:10:24 UTC

[kafka] branch trunk updated: MINOR: Add select changes from 3rd KIP-307 PR for incrementing name index counter (#6754)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9077d83  MINOR: Add select changes from 3rd KIP-307 PR for incrementing name index counter (#6754)
9077d83 is described below

commit 9077d83672a4d08273ce4a6012f1787f5313f948
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Fri May 17 18:10:11 2019 -0400

    MINOR: Add select changes from 3rd KIP-307 PR for incrementing name index counter (#6754)
    
    When users provide a name for operation via the Streams DSL, we need to increment the counter used for auto-generated names to make sure any operators downstream of a named operator still produce a compatible name.
    
    This PR is a subset of #6411 by @fhussonnois. We need to merge this PR now because it covers cases when users name repartition topics or state stores.
    
    Updated tests to reflect the counter produces expected number even when the user provides a name.
    
    Matthias J. Sax <mj...@apache.org>,  John Roesler <jo...@confluent.io>
---
 .../org/apache/kafka/streams/kstream/Named.java    |  6 ++-
 .../kstream/internals/InternalStreamsBuilder.java  |  2 +-
 .../streams/kstream/internals/NamedInternal.java   | 48 +++++++++++-----------
 .../apache/kafka/streams/StreamsBuilderTest.java   |  8 ++--
 .../kstream/internals/NamedInternalTest.java       | 48 +++++++++++++---------
 5 files changed, 62 insertions(+), 50 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Named.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Named.java
index 1db031a..84bb819 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Named.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Named.java
@@ -26,6 +26,10 @@ public class Named implements NamedOperation<Named> {
 
     protected String name;
 
+    protected Named(final Named named) {
+        this(Objects.requireNonNull(named, "named can't be null").name);
+    }
+
     protected Named(final String name) {
         this.name = name;
         if (name != null) {
@@ -51,7 +55,7 @@ public class Named implements NamedOperation<Named> {
         return new Named(name);
     }
 
-    static void validate(final String name) {
+    protected static void validate(final String name) {
         if (name.isEmpty())
             throw new TopologyException("Name is illegal, it can't be empty");
         if (name.equals(".") || name.equals(".."))
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index e7a7678..3a90fd2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -118,7 +118,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         final String sourceName = new NamedInternal(consumed.name())
                 .orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME);
         final String tableSourceName = new NamedInternal(consumed.name())
-                .suffixWithOrElseGet("-table-source", () -> newProcessorName(KTableImpl.SOURCE_NAME));
+                .suffixWithOrElseGet("-table-source", this, KTableImpl.SOURCE_NAME);
         final KTableSource<K, V> tableSource = new KTableSource<>(materialized.storeName(), materialized.queryableStoreName());
         final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(tableSource, tableSourceName);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
index e83728e..d478e9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
@@ -17,8 +17,6 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Named;
-import java.util.Optional;
-import java.util.function.Supplier;
 
 public class NamedInternal extends Named {
 
@@ -33,14 +31,14 @@ public class NamedInternal extends Named {
     /**
      * Creates a new {@link NamedInternal} instance.
      *
-     * @param internal  the internal name.
+     * @param internal the internal name.
      */
     NamedInternal(final String internal) {
         super(internal);
     }
 
     /**
-     * @return  a string name.
+     * @return a string name.
      */
     public String name() {
         return name;
@@ -51,31 +49,31 @@ public class NamedInternal extends Named {
         return new NamedInternal(name);
     }
 
-    /**
-     * Check whether an internal name is defined.
-     * @return {@code false} if no name is set.
-     */
-    public boolean isDefined() {
-        return name != null;
-    }
+    String suffixWithOrElseGet(final String suffix, final InternalNameProvider provider, final String prefix) {
+        // We actually do not need to generate processor names for operation if a name is specified.
+        // But before returning, we still need to burn index for the operation to keep topology backward compatibility.
+        if (name != null) {
+            provider.newProcessorName(prefix);
+
+            final String suffixed = name + suffix;
+            // Re-validate generated name as suffixed string could be too large.
+            Named.validate(suffixed);
 
-    String suffixWithOrElseGet(final String suffix, final Supplier<String> supplier) {
-        final Optional<String> suffixed = Optional.ofNullable(this.name).map(s -> s + suffix);
-        // Creating a new named will re-validate generated name as suffixed string could be too large.
-        return new NamedInternal(suffixed.orElseGet(supplier)).name();
+            return suffixed;
+        } else {
+            return provider.newProcessorName(prefix);
+        }
     }
 
     String orElseGenerateWithPrefix(final InternalNameProvider provider, final String prefix) {
-        return orElseGet(() -> provider.newProcessorName(prefix));
+        // We actually do not need to generate processor names for operation if a name is specified.
+        // But before returning, we still need to burn index for the operation to keep topology backward compatibility.
+        if (name != null) {
+            provider.newProcessorName(prefix);
+            return name;
+        }  else {
+            return provider.newProcessorName(prefix);
+        }
     }
 
-    /**
-     * Returns the internal name or the value returns from the supplier.
-     *
-     * @param supplier  the supplier to be used if internal name is empty.
-     * @return an internal string name.
-     */
-    private String orElseGet(final Supplier<String> supplier) {
-        return Optional.ofNullable(this.name).orElseGet(supplier);
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 669cece..93d444b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -424,7 +424,7 @@ public class StreamsBuilderTest {
         builder.stream(STREAM_TOPIC_TWO);
         builder.build();
         final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
-        assertSpecifiedNameForOperation(topology, expected, "KSTREAM-SOURCE-0000000000");
+        assertSpecifiedNameForOperation(topology, expected, "KSTREAM-SOURCE-0000000001");
     }
 
     @Test
@@ -440,8 +440,8 @@ public class StreamsBuilderTest {
                 topology,
                 expected,
                 expected + "-table-source",
-                "KSTREAM-SOURCE-0000000002",
-                "KTABLE-SOURCE-0000000003");
+                "KSTREAM-SOURCE-0000000004",
+                "KTABLE-SOURCE-0000000005");
     }
 
     @Test
@@ -467,7 +467,7 @@ public class StreamsBuilderTest {
         stream.to(STREAM_TOPIC_TWO);
         builder.build();
         final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
-        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", expected, "KSTREAM-SINK-0000000001");
+        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", expected, "KSTREAM-SINK-0000000002");
     }
 
     private void assertSpecifiedNameForOperation(final ProcessorTopology topology, final String... expected) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java
index 98b3a4d..1c4c700 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java
@@ -22,45 +22,55 @@ import static org.junit.Assert.assertEquals;
 
 public class NamedInternalTest {
 
-    private static final String TEST_VALUE  = "default-value";
+    private static final String TEST_PREFIX = "prefix-";
+    private static final String TEST_VALUE = "default-value";
     private static final String TEST_SUFFIX = "-suffix";
 
+    private static class TestNameProvider implements InternalNameProvider {
+        int index = 0;
+
+        @Override
+        public String newProcessorName(final String prefix) {
+            return prefix + "PROCESSOR-" + index++;
+        }
+
+        @Override
+        public String newStoreName(final String prefix) {
+            return prefix + "STORE-"  + index++;
+        }
+
+    }
+
     @Test
     public void shouldSuffixNameOrReturnProviderValue() {
         final String name = "foo";
+        final TestNameProvider provider = new TestNameProvider();
+
         assertEquals(
-                name + TEST_SUFFIX,
-                NamedInternal.with(name).suffixWithOrElseGet(TEST_SUFFIX, () -> TEST_VALUE)
+            name + TEST_SUFFIX,
+            NamedInternal.with(name).suffixWithOrElseGet(TEST_SUFFIX, provider, TEST_PREFIX)
         );
+
+        // 1, not 0, indicates that the named call still burned an index number.
         assertEquals(
-                TEST_VALUE,
-                NamedInternal.with(null).suffixWithOrElseGet(TEST_SUFFIX, () -> TEST_VALUE)
+            "prefix-PROCESSOR-1",
+            NamedInternal.with(null).suffixWithOrElseGet(TEST_SUFFIX, provider, TEST_PREFIX)
         );
     }
 
     @Test
     public void shouldGenerateWithPrefixGivenEmptyName() {
         final String prefix = "KSTREAM-MAP-";
-        assertEquals(prefix + "PROCESSOR-NAME", NamedInternal.with(null).orElseGenerateWithPrefix(
-                new InternalNameProvider() {
-                    @Override
-                    public String newProcessorName(final String prefix) {
-                        return prefix + "PROCESSOR-NAME";
-                    }
-
-                    @Override
-                    public String newStoreName(final String prefix) {
-                        return null;
-                    }
-                },
-                prefix)
+        assertEquals(prefix + "PROCESSOR-0", NamedInternal.with(null).orElseGenerateWithPrefix(
+            new TestNameProvider(),
+            prefix)
         );
     }
 
     @Test
     public void shouldNotGenerateWithPrefixGivenValidName() {
         final String validName = "validName";
-        assertEquals(validName, NamedInternal.with(validName).orElseGenerateWithPrefix(null, "KSTREAM-MAP-")
+        assertEquals(validName, NamedInternal.with(validName).orElseGenerateWithPrefix(new TestNameProvider(), "KSTREAM-MAP-")
         );
     }
 }
\ No newline at end of file