You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/20 02:27:14 UTC
[kafka] branch trunk updated: KAFKA-6958: Add new NamedOperation
interface to enforce consistency in naming operations (#6409)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fa57eb0 KAFKA-6958: Add new NamedOperation interface to enforce consistency in naming operations (#6409)
fa57eb0 is described below
commit fa57eb065d032c225f63a0b2ca3f050e728c2235
Author: Florian Hussonnois <fh...@gmail.com>
AuthorDate: Wed Mar 20 03:27:03 2019 +0100
KAFKA-6958: Add new NamedOperation interface to enforce consistency in naming operations (#6409)
Sub-task required to allow to define custom processor names with KStreams DSL(KIP-307) :
- add new public interface NamedOperation
- deprecate methods Joined.as() and Joined.name()
- update Suppredded interface to extend NamedOperation
Reviewers: Matthias J. Sax <mj...@apache.org>, John Roesler <jo...@confluent.io>, Bill Bejeck <bb...@gmail.com>
---
.../org/apache/kafka/streams/kstream/Joined.java | 40 ++++++++++++++++---
.../kafka/streams/kstream/NamedOperation.java | 32 +++++++++++++++
.../apache/kafka/streams/kstream/Suppressed.java | 3 +-
.../streams/kstream/internals/JoinedInternal.java | 45 ++++++++++++++++++++++
.../streams/kstream/internals/KStreamImpl.java | 15 ++++++--
5 files changed, 124 insertions(+), 11 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
index aa29c68..1343487 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
@@ -22,13 +22,12 @@ import org.apache.kafka.common.serialization.Serde;
* The {@code Joined} class represents optional params that can be passed to
* {@link KStream#join}, {@link KStream#leftJoin}, and {@link KStream#outerJoin} operations.
*/
-public class Joined<K, V, VO> {
-
- private final Serde<K> keySerde;
- private final Serde<V> valueSerde;
- private final Serde<VO> otherValueSerde;
- private final String name;
+public class Joined<K, V, VO> implements NamedOperation<Joined<K, V, VO>> {
+ protected final Serde<K> keySerde;
+ protected final Serde<V> valueSerde;
+ protected final Serde<VO> otherValueSerde;
+ protected final String name;
private Joined(final Serde<K> keySerde,
final Serde<V> valueSerde,
@@ -40,6 +39,10 @@ public class Joined<K, V, VO> {
this.name = name;
}
+ protected Joined(final Joined<K, V, VO> joined) {
+ this(joined.keySerde, joined.valueSerde, joined.otherValueSerde, joined.name);
+ }
+
/**
* Create an instance of {@code Joined} with key, value, and otherValue {@link Serde} instances.
* {@code null} values are accepted and will be replaced by the default serdes as defined in config.
@@ -135,11 +138,30 @@ public class Joined<K, V, VO> {
* @param <V> value type
* @param <VO> other value type
* @return new {@code Joined} instance configured with the name
+ *
+ * @deprecated use {@link #as(String)} instead
*/
+ @Deprecated
public static <K, V, VO> Joined<K, V, VO> named(final String name) {
return new Joined<>(null, null, null, name);
}
+ /**
+ * Create an instance of {@code Joined} with base name for all components of the join, this may
+ * include any repartition topics created to complete the join.
+ *
+ * @param name the name used as the base for naming components of the join including any
+ * repartition topics
+ * @param <K> key type
+ * @param <V> value type
+ * @param <VO> other value type
+ * @return new {@code Joined} instance configured with the name
+ *
+ */
+ public static <K, V, VO> Joined<K, V, VO> as(final String name) {
+ return new Joined<>(null, null, null, name);
+ }
+
/**
* Set the key {@link Serde} to be used. Null values are accepted and will be replaced by the default
@@ -182,6 +204,7 @@ public class Joined<K, V, VO> {
* repartition topics
* @return new {@code Joined} instance configured with the {@code name}
*/
+ @Override
public Joined<K, V, VO> withName(final String name) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
}
@@ -198,7 +221,12 @@ public class Joined<K, V, VO> {
return otherValueSerde;
}
+ /**
+ * @deprecated this method will be removed in a in a future release
+ */
+ @Deprecated
public String name() {
return name;
}
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java b/streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java
new file mode 100644
index 0000000..9a2c40b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+/**
+ * Default interface which can be used to personalized the named of operations, internal topics or store.
+ */
+interface NamedOperation<T extends NamedOperation<T>> {
+
+ /**
+ * Sets the name to be used for an operation.
+ *
+ * @param name the name to use.
+ * @return an instance of {@link NamedOperation}
+ */
+ T withName(final String name);
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
index 6854101..b5d7937 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import java.time.Duration;
-public interface Suppressed<K> {
+public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
/**
* Marker interface for a buffer configuration that is "strict" in the sense that it will strictly
@@ -163,5 +163,6 @@ public interface Suppressed<K> {
* @param name The name to be used for the suppression node and changelog topic
* @return The same configuration with the addition of the given {@code name}.
*/
+ @Override
Suppressed<K> withName(final String name);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java
new file mode 100644
index 0000000..99f7a0f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.Joined;
+
+public class JoinedInternal<K, V, VO> extends Joined<K, V, VO> {
+
+ JoinedInternal(final Joined<K, V, VO> joined) {
+ super(joined);
+ }
+
+ public Serde<K> keySerde() {
+ return keySerde;
+ }
+
+ public Serde<V> valueSerde() {
+ return valueSerde;
+ }
+
+ public Serde<VO> otherValueSerde() {
+ return otherValueSerde;
+ }
+
+ @Override // TODO remove annotation when super.name() is removed
+ @SuppressWarnings("deprecation") // this method should not be removed if super.name() is removed
+ public String name() {
+ return name;
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 856536c..41260c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -567,13 +567,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
KStreamImpl<K, V> joinThis = this;
KStreamImpl<K, VO> joinOther = (KStreamImpl<K, VO>) other;
+ final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
+ final String name = joinedInternal.name();
if (joinThis.repartitionRequired) {
- final String leftJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-left" : joinThis.name;
+ final String leftJoinRepartitionTopicName = name != null ? name + "-left" : joinThis.name;
joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, joined.keySerde(), joined.valueSerde());
}
if (joinOther.repartitionRequired) {
- final String rightJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-right" : joinOther.name;
+ final String rightJoinRepartitionTopicName = name != null ? name + "-right" : joinOther.name;
joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, joined.keySerde(), joined.otherValueSerde());
}
@@ -679,9 +681,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
+
+ final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
+ final String name = joinedInternal.name();
if (repartitionRequired) {
final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
- joined.name() != null ? joined.name() : name,
+ name != null ? name : this.name,
joined.keySerde(),
joined.valueSerde()
);
@@ -703,9 +708,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
+ final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
+ final String internalName = joinedInternal.name();
if (repartitionRequired) {
final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
- joined.name() != null ? joined.name() : name,
+ internalName != null ? internalName : name,
joined.keySerde(),
joined.valueSerde()
);