You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/20 02:27:14 UTC

[kafka] branch trunk updated: KAFKA-6958: Add new NamedOperation interface to enforce consistency in naming operations (#6409)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fa57eb0  KAFKA-6958: Add new NamedOperation interface to enforce consistency in naming operations (#6409)
fa57eb0 is described below

commit fa57eb065d032c225f63a0b2ca3f050e728c2235
Author: Florian Hussonnois <fh...@gmail.com>
AuthorDate: Wed Mar 20 03:27:03 2019 +0100

    KAFKA-6958: Add new NamedOperation interface to enforce consistency in naming operations (#6409)
    
    Sub-task required to allow to define custom processor names with KStreams DSL(KIP-307) :
      - add new public interface NamedOperation
      - deprecate methods Joined.as() and Joined.name()
      - update Suppredded interface to extend NamedOperation
    
    Reviewers: Matthias J. Sax <mj...@apache.org>,  John Roesler <jo...@confluent.io>, Bill Bejeck <bb...@gmail.com>
---
 .../org/apache/kafka/streams/kstream/Joined.java   | 40 ++++++++++++++++---
 .../kafka/streams/kstream/NamedOperation.java      | 32 +++++++++++++++
 .../apache/kafka/streams/kstream/Suppressed.java   |  3 +-
 .../streams/kstream/internals/JoinedInternal.java  | 45 ++++++++++++++++++++++
 .../streams/kstream/internals/KStreamImpl.java     | 15 ++++++--
 5 files changed, 124 insertions(+), 11 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
index aa29c68..1343487 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
@@ -22,13 +22,12 @@ import org.apache.kafka.common.serialization.Serde;
  * The {@code Joined} class represents optional params that can be passed to
  * {@link KStream#join}, {@link KStream#leftJoin}, and  {@link KStream#outerJoin} operations.
  */
-public class Joined<K, V, VO> {
-
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
-    private final Serde<VO> otherValueSerde;
-    private final String name;
+public class Joined<K, V, VO> implements NamedOperation<Joined<K, V, VO>> {
 
+    protected final Serde<K> keySerde;
+    protected final Serde<V> valueSerde;
+    protected final Serde<VO> otherValueSerde;
+    protected final String name;
 
     private Joined(final Serde<K> keySerde,
                    final Serde<V> valueSerde,
@@ -40,6 +39,10 @@ public class Joined<K, V, VO> {
         this.name = name;
     }
 
+    protected Joined(final Joined<K, V, VO> joined) {
+        this(joined.keySerde, joined.valueSerde, joined.otherValueSerde, joined.name);
+    }
+
     /**
      * Create an instance of {@code Joined} with key, value, and otherValue {@link Serde} instances.
      * {@code null} values are accepted and will be replaced by the default serdes as defined in config.
@@ -135,11 +138,30 @@ public class Joined<K, V, VO> {
      * @param <V> value type
      * @param <VO> other value type
      * @return new {@code Joined} instance configured with the name
+     *
+     * @deprecated use {@link #as(String)} instead
      */
+    @Deprecated
     public static <K, V, VO> Joined<K, V, VO> named(final String name) {
         return new Joined<>(null, null, null, name);
     }
 
+    /**
+     * Create an instance of {@code Joined} with base name for all components of the join, this may
+     * include any repartition topics created to complete the join.
+     *
+     * @param name the name used as the base for naming components of the join including any
+     * repartition topics
+     * @param <K> key type
+     * @param <V> value type
+     * @param <VO> other value type
+     * @return new {@code Joined} instance configured with the name
+     *
+     */
+    public static <K, V, VO> Joined<K, V, VO> as(final String name) {
+        return new Joined<>(null, null, null, name);
+    }
+
 
     /**
      * Set the key {@link Serde} to be used. Null values are accepted and will be replaced by the default
@@ -182,6 +204,7 @@ public class Joined<K, V, VO> {
      * repartition topics
      * @return new {@code Joined} instance configured with the {@code name}
      */
+    @Override
     public Joined<K, V, VO> withName(final String name) {
         return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
     }
@@ -198,7 +221,12 @@ public class Joined<K, V, VO> {
         return otherValueSerde;
     }
 
+    /**
+     * @deprecated this method will be removed in a in a future release
+     */
+    @Deprecated
     public String name() {
         return name;
     }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java b/streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java
new file mode 100644
index 0000000..9a2c40b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+/**
+ * Default interface which can be used to personalized the named of operations, internal topics or store.
+ */
+interface NamedOperation<T extends NamedOperation<T>> {
+
+    /**
+     * Sets the name to be used for an operation.
+     *
+     * @param name  the name to use.
+     * @return an instance of {@link NamedOperation}
+     */
+    T withName(final String name);
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
index 6854101..b5d7937 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 
 import java.time.Duration;
 
-public interface Suppressed<K> {
+public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
 
     /**
      * Marker interface for a buffer configuration that is "strict" in the sense that it will strictly
@@ -163,5 +163,6 @@ public interface Suppressed<K> {
      * @param name The name to be used for the suppression node and changelog topic
      * @return The same configuration with the addition of the given {@code name}.
      */
+    @Override
     Suppressed<K> withName(final String name);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java
new file mode 100644
index 0000000..99f7a0f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.Joined;
+
+public class JoinedInternal<K, V, VO> extends Joined<K, V, VO>  {
+
+    JoinedInternal(final Joined<K, V, VO> joined) {
+        super(joined);
+    }
+
+    public Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    public Serde<V> valueSerde() {
+        return valueSerde;
+    }
+
+    public Serde<VO> otherValueSerde() {
+        return otherValueSerde;
+    }
+
+    @Override // TODO remove annotation when super.name() is removed
+    @SuppressWarnings("deprecation") // this method should not be removed if super.name() is removed
+    public String name() {
+        return name;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 856536c..41260c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -567,13 +567,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         KStreamImpl<K, V> joinThis = this;
         KStreamImpl<K, VO> joinOther = (KStreamImpl<K, VO>) other;
 
+        final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
+        final String name = joinedInternal.name();
         if (joinThis.repartitionRequired) {
-            final String leftJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-left" : joinThis.name;
+            final String leftJoinRepartitionTopicName = name != null ? name + "-left" : joinThis.name;
             joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, joined.keySerde(), joined.valueSerde());
         }
 
         if (joinOther.repartitionRequired) {
-            final String rightJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-right" : joinOther.name;
+            final String rightJoinRepartitionTopicName = name != null ? name + "-right" : joinOther.name;
             joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, joined.keySerde(), joined.otherValueSerde());
         }
 
@@ -679,9 +681,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
+
+        final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
+        final String name = joinedInternal.name();
         if (repartitionRequired) {
             final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
-                joined.name() != null ? joined.name() : name,
+                name != null ? name : this.name,
                 joined.keySerde(),
                 joined.valueSerde()
             );
@@ -703,9 +708,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
+        final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
+        final String internalName = joinedInternal.name();
         if (repartitionRequired) {
             final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
-                joined.name() != null ? joined.name() : name,
+                internalName != null ? internalName : name,
                 joined.keySerde(),
                 joined.valueSerde()
             );