You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/17 02:49:19 UTC

[GitHub] [kafka] mjsax opened a new pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

mjsax opened a new pull request #8679:
URL: https://github.com/apache/kafka/pull/8679


    - part of KIP-221


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426337376



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -846,16 +847,13 @@
      * from the auto-generated topic using default serializers, deserializers, and producer's {@link DefaultPartitioner}.
      * The number of partitions is determined based on the upstream topics partition numbers.
      * <p>
-     * This operation is similar to {@link #through(String)}, however, Kafka Streams manages the used topic automatically.

Review comment:
       I'd agree with removing it. I guess if you want to preserve it in some fashion, you could add the opposite statement to the `through()` documentation.

##########
File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.kstream.{Repartitioned => RepartitionedJ}
+import org.apache.kafka.streams.processor.StreamPartitioner
+
+object Repartitioned {
+
+  /**
+   * Create a Repartitioned instance with provided keySerde and valueSerde.
+   *
+   * @tparam K         key type
+   * @tparam V         value type
+   * @param keySerde    Serde to use for serializing the key
+   * @param valueSerde  Serde to use for serializing the value
+   * @return A new [[Repartitioned]] instance configured with keySerde and valueSerde
+   * @see KStream#repartition(Repartitioned)
+   */
+  def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): RepartitionedJ[K, V] =

Review comment:
       We'd have to change from `object` to `class` or `case class` (which would have been my preference to begin with), since `object`s can only have static members.
   
   Probably, this ship has sailed for now, and we should just keep doing what the other similar classes are doing. Since we've found so much wackiness in the Scala API since it was introduced, it might be a good idea to consider revamping the whole thing from scratch some day.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -815,9 +815,10 @@
      *
      * @param topic the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
-     * @see #repartition()
-     * @see #repartition(Repartitioned)
+     * @deprecated used {@link #repartition()} instead

Review comment:
       It's a little nice for future reference when we also say when it became deprecated, such as "since 2.6".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#issuecomment-629734751


   Call for review @lkokhreidze @vvcephei 
   
   Also updates the Scala API...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r427752328



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
##########
@@ -254,6 +253,7 @@ private Topology getTopologyWithChangingValuesAfterChangingKey(final String opti
 
     }
 
+    @SuppressWarnings("deprecation") // specifically testing the deprecated variant

Review comment:
       Updated




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #8679:
URL: https://github.com/apache/kafka/pull/8679


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r427617788



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -484,8 +493,14 @@ private Topology setupTopologyWithIntermediateUserTopic(final String outputTopic
             .toStream()
             .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
 
-        input.through(INTERMEDIATE_USER_TOPIC)
-            .groupByKey()
+        final KStream<Long, String> stream;
+        if (useRepartitioned) {
+            stream = input.repartition();
+        } else {
+            input.to(INTERMEDIATE_USER_TOPIC);
+            stream = builder.stream(INTERMEDIATE_USER_TOPIC);

Review comment:
       On re-reading, I realize I misunderstood the situation. I revert my comment :grimacing: . 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r427674857



##########
File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
##########
@@ -232,10 +232,53 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param produced the instance of Produced that gives the serdes and `StreamPartitioner`
    * @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#through`
+   * @deprecated use `repartition()` instead
    */
+  @deprecated
   def through(topic: String)(implicit produced: Produced[K, V]): KStream[K, V] =
     new KStream(inner.through(topic, produced))
 
+  /**
+   * Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Repartitioned` instance
+   * for configuration of the `Serde key serde`, `Serde value serde`, `StreamPartitioner`, number of partitions, and
+   * topic name part.
+   * <p>
+   * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance.
+   * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.
+   * The topic will be named as "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
+   * `StreamsConfig` via parameter APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG`,
+   * "&lt;name&gt;" is either provided via `Repartitioned#as(String)` or an internally
+   * generated name, and "-repartition" is a fixed suffix.
+   * <p>
+   * The user can either supply the `Repartitioned` instance as an implicit in scope or she can also provide implicit
+   * key and value serdes that will be converted to a `Repartitioned` instance implicitly.
+   * <p>
+   * {{{
+   * Example:
+   *
+   * // brings implicit serdes in scope
+   * import Serdes._
+   *
+   * //..
+   * val clicksPerRegion: KStream[String, Long] = //..
+   *
+   * // Implicit serdes in scope will generate an implicit Produced instance, which
+   * // will be passed automatically to the call of through below
+   * clicksPerRegion.repartition
+   *
+   * // Similarly you can create an implicit Repartitioned and it will be passed implicitly
+   * // to the repartition call
+   * }}}
+   *
+   * @param repartitioned the `Repartitioned` instance used to specify `Serdes`, `StreamPartitioner`` which determines
+   *                      how records are distributed among partitions of the topic,
+   *                      part of the topic name, and number of partitions for a repartition topic.
+   * @return a [[KStream]] that contains the exact same repartitioned records as this [[KStream]]
+   * @see `org.apache.kafka.streams.kstream.KStream#repartition`
+   */
+  def repartition(implicit repartitioned: Repartitioned[K, V]): KStream[K, V] =

Review comment:
       Not sure what we can/should test? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211685



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -213,7 +213,8 @@ private void runSimpleCopyTest(final int numberOfRestarts,
         final KStream<Long, Long> input = builder.stream(inputTopic);
         KStream<Long, Long> output = input;
         if (throughTopic != null) {
-            output = input.through(throughTopic);
+            input.to(throughTopic);
+            output = builder.stream(throughTopic);

Review comment:
       Using `to()` and `steam()` is "simpler" as we cleanup topics in-between (and thus avoid internal topics).
   
   We could of course also use `repartition()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#issuecomment-631809507


   Added the test. Will merge after Jenkins passed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r428499696



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -835,9 +836,9 @@
      * @param topic     the topic name
      * @param produced  the options to use when producing to the topic
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
-     * @see #repartition()
-     * @see #repartition(Repartitioned)
+     * @deprecated since 2.6; use #repartition(Repartitioned) instead

Review comment:
       Could we use ```{@link #repartition(Repartitioned)}``` ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r427543403



##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -285,7 +286,28 @@ public void shouldProcessViaThroughTopic() {
         assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), sourceProcessorSupplier.theCapturedProcessor().processed);
         assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed);
     }
-    
+
+    @Test
+    public void shouldProcessViaRepartitionTopic() {

Review comment:
       Thanks!

##########
File path: docs/streams/upgrade-guide.html
##########
@@ -95,9 +95,11 @@ <h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API
         Note that you need brokers with version 2.5 or newer to use this feature.
     </p>
     <p>
-        As of 2.6.0 Kafka Streams offers a new <code>KStream.repartition()</code> operator (as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint">KIP-221</a>).
+        As of 2.6.0 Kafka Streams deprecates <code>KStream.through()<code> if favor of the new <code>KStream.repartition()</code> operator
+        (as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint">KIP-221</a>).
         <code>KStream.repartition()</code> is similar to <code>KStream.through()</code>, however Kafka Streams will manage the topic for you.
-        Refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details.
+        If you need to write into and read back from a topic you mange by your own, you can fall back to use <code>KStream.to()</code> in combination with <code>StreamsBuilder#stream()</code>.

Review comment:
       ```suggestion
           If you need to write into and read back from a topic that you mange, you can fall back to use <code>KStream.to()</code> in combination with <code>StreamsBuilder#stream()</code>.
   ```

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -1763,32 +1763,23 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
                                 streams/tables of a join &#8211; it is up to the user to ensure that this is the case.</p>
                         </div>
                         <p><strong>Ensuring data co-partitioning:</strong> If the inputs of a join are not co-partitioned yet, you must ensure this manually.
-                            You may follow a procedure such as outlined below.</p>
+                            You may follow a procedure such as outlined below.
+                            It is recommended to repartitiont to topic with fewers partitions to match the larger partition number of avoid bottlenecks.

Review comment:
       ```suggestion
                               It is recommended to repartition the topic with fewer partitions to match the larger partition number of avoid bottlenecks.
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1082,7 +1081,7 @@ public void cleanUp() {
      * This will use the default Kafka Streams partitioner to locate the partition.
      * If a {@link StreamPartitioner custom partitioner} has been
      * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig} or
-     * {@link KStream#through(String, Produced)}, or if the original {@link KTable}'s input
+     * {@link KStream#repartition(Repartitioned)}, or if the original {@link KTable}'s input

Review comment:
       Might as well make this update, since we may remove the methods at different times.

##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -265,6 +265,7 @@ public void shouldProcessingFromSinkTopic() {
                  processorSupplier.theCapturedProcessor().processed);
     }
 
+    @SuppressWarnings("deprecation")

Review comment:
       My opinion is that it's generally better not to suppress but instead just deprecate this method as well. It's not really that important for tests, since no one else is going to call the method, so feel free to take or leave the advice.

##########
File path: docs/streams/upgrade-guide.html
##########
@@ -95,9 +95,11 @@ <h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API
         Note that you need brokers with version 2.5 or newer to use this feature.
     </p>
     <p>
-        As of 2.6.0 Kafka Streams offers a new <code>KStream.repartition()</code> operator (as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint">KIP-221</a>).
+        As of 2.6.0 Kafka Streams deprecates <code>KStream.through()<code> if favor of the new <code>KStream.repartition()</code> operator
+        (as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint">KIP-221</a>).
         <code>KStream.repartition()</code> is similar to <code>KStream.through()</code>, however Kafka Streams will manage the topic for you.
-        Refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details.
+        If you need to write into and read back from a topic you mange by your own, you can fall back to use <code>KStream.to()</code> in combination with <code>StreamsBuilder#stream()</code>.
+        We refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details about <code>KStream.repartition()</code>.

Review comment:
       ```suggestion
           Please refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details about <code>KStream.repartition()</code>.
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/RepartitionedInternal.java
##########
@@ -21,33 +21,33 @@
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
 
-class RepartitionedInternal<K, V> extends Repartitioned<K, V> {
+public class RepartitionedInternal<K, V> extends Repartitioned<K, V> {

Review comment:
       It's worth noting that it only needs to be visible for the scala _tests_ that verify the scala Repartitioned builder results in a correctly configured object. For the public API, we only convert a scala Repartitioned to a java Repartitioned.

##########
File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
##########
@@ -232,10 +232,53 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param produced the instance of Produced that gives the serdes and `StreamPartitioner`
    * @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#through`
+   * @deprecated use `repartition()` instead
    */
+  @deprecated
   def through(topic: String)(implicit produced: Produced[K, V]): KStream[K, V] =
     new KStream(inner.through(topic, produced))
 
+  /**
+   * Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Repartitioned` instance
+   * for configuration of the `Serde key serde`, `Serde value serde`, `StreamPartitioner`, number of partitions, and
+   * topic name part.
+   * <p>
+   * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance.
+   * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.
+   * The topic will be named as "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
+   * `StreamsConfig` via parameter APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG`,
+   * "&lt;name&gt;" is either provided via `Repartitioned#as(String)` or an internally
+   * generated name, and "-repartition" is a fixed suffix.
+   * <p>
+   * The user can either supply the `Repartitioned` instance as an implicit in scope or she can also provide implicit
+   * key and value serdes that will be converted to a `Repartitioned` instance implicitly.
+   * <p>
+   * {{{
+   * Example:
+   *
+   * // brings implicit serdes in scope
+   * import Serdes._
+   *
+   * //..
+   * val clicksPerRegion: KStream[String, Long] = //..
+   *
+   * // Implicit serdes in scope will generate an implicit Produced instance, which
+   * // will be passed automatically to the call of through below
+   * clicksPerRegion.repartition
+   *
+   * // Similarly you can create an implicit Repartitioned and it will be passed implicitly
+   * // to the repartition call
+   * }}}
+   *
+   * @param repartitioned the `Repartitioned` instance used to specify `Serdes`, `StreamPartitioner`` which determines
+   *                      how records are distributed among partitions of the topic,
+   *                      part of the topic name, and number of partitions for a repartition topic.
+   * @return a [[KStream]] that contains the exact same repartitioned records as this [[KStream]]
+   * @see `org.apache.kafka.streams.kstream.KStream#repartition`
+   */
+  def repartition()(implicit repartitioned: Repartitioned[K, V]): KStream[K, V] =

Review comment:
       I think we'd prefer:
   ```suggestion
     def repartition(implicit repartitioned: Repartitioned[K, V]): KStream[K, V] =
   ```
   similar to groupByKey, although I'm admittedly not sure if it actually matters.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -484,8 +493,14 @@ private Topology setupTopologyWithIntermediateUserTopic(final String outputTopic
             .toStream()
             .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
 
-        input.through(INTERMEDIATE_USER_TOPIC)
-            .groupByKey()
+        final KStream<Long, String> stream;
+        if (useRepartitioned) {
+            stream = input.repartition();
+        } else {
+            input.to(INTERMEDIATE_USER_TOPIC);
+            stream = builder.stream(INTERMEDIATE_USER_TOPIC);

Review comment:
       I'm wondering if we should continue testing with `through`, to ensure it continues to work. WDYT?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
##########
@@ -254,6 +253,7 @@ private Topology getTopologyWithChangingValuesAfterChangingKey(final String opti
 
     }
 
+    @SuppressWarnings("deprecation") // specifically testing the deprecated variant

Review comment:
       This would be a case where I would advocate more strongly to deprecate _this_ method, to avoid accidentally "hiding" the deprecation from callers.

##########
File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
##########
@@ -218,7 +218,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * import Serdes._
    *
    * //..
-   * val clicksPerRegion: KTable[String, Long] = //..
+   * val clicksPerRegion: KStream[String, Long] = //..

Review comment:
       Oops...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#issuecomment-632363696


   Java 8 passed.
   Java 11:
   ```
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   ```
   Java 14:
   ```
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211624



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -484,8 +493,14 @@ private Topology setupTopologyWithIntermediateUserTopic(final String outputTopic
             .toStream()
             .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
 
-        input.through(INTERMEDIATE_USER_TOPIC)
-            .groupByKey()
+        final KStream<Long, String> stream;
+        if (useRepartitioned) {
+            stream = input.repartition();
+        } else {
+            input.to(INTERMEDIATE_USER_TOPIC);
+            stream = builder.stream(INTERMEDIATE_USER_TOPIC);

Review comment:
       We still need to test this, because topics using this pattern are still consider _intermediate_ topics and the `--intermediat-topic` flag in `StreamsResetter` is still useful and not changed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r427578311



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -484,8 +493,14 @@ private Topology setupTopologyWithIntermediateUserTopic(final String outputTopic
             .toStream()
             .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
 
-        input.through(INTERMEDIATE_USER_TOPIC)
-            .groupByKey()
+        final KStream<Long, String> stream;
+        if (useRepartitioned) {
+            stream = input.repartition();
+        } else {
+            input.to(INTERMEDIATE_USER_TOPIC);
+            stream = builder.stream(INTERMEDIATE_USER_TOPIC);

Review comment:
       Well, `through()` is literally implemented as `to()` + `stream()`... But I can revert and add a suppress annotation, too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r428055668



##########
File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
##########
@@ -232,10 +232,53 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param produced the instance of Produced that gives the serdes and `StreamPartitioner`
    * @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#through`
+   * @deprecated use `repartition()` instead
    */
+  @deprecated
   def through(topic: String)(implicit produced: Produced[K, V]): KStream[K, V] =
     new KStream(inner.through(topic, produced))
 
+  /**
+   * Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Repartitioned` instance
+   * for configuration of the `Serde key serde`, `Serde value serde`, `StreamPartitioner`, number of partitions, and
+   * topic name part.
+   * <p>
+   * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance.
+   * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.
+   * The topic will be named as "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
+   * `StreamsConfig` via parameter APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG`,
+   * "&lt;name&gt;" is either provided via `Repartitioned#as(String)` or an internally
+   * generated name, and "-repartition" is a fixed suffix.
+   * <p>
+   * The user can either supply the `Repartitioned` instance as an implicit in scope or she can also provide implicit
+   * key and value serdes that will be converted to a `Repartitioned` instance implicitly.
+   * <p>
+   * {{{
+   * Example:
+   *
+   * // brings implicit serdes in scope
+   * import Serdes._
+   *
+   * //..
+   * val clicksPerRegion: KStream[String, Long] = //..
+   *
+   * // Implicit serdes in scope will generate an implicit Produced instance, which
+   * // will be passed automatically to the call of through below
+   * clicksPerRegion.repartition
+   *
+   * // Similarly you can create an implicit Repartitioned and it will be passed implicitly
+   * // to the repartition call
+   * }}}
+   *
+   * @param repartitioned the `Repartitioned` instance used to specify `Serdes`, `StreamPartitioner`` which determines
+   *                      how records are distributed among partitions of the topic,
+   *                      part of the topic name, and number of partitions for a repartition topic.
+   * @return a [[KStream]] that contains the exact same repartitioned records as this [[KStream]]
+   * @see `org.apache.kafka.streams.kstream.KStream#repartition`
+   */
+  def repartition(implicit repartitioned: Repartitioned[K, V]): KStream[K, V] =

Review comment:
       We have previously had embarrassing bugs like, "It's not possible to write code that compiles using the Scala DSL". If we had had even the most trivial test written for those DSL methods, we would never have released those bugs. So, I'd just recommend creating _any_ topology that contains this operator and maybe using TTD to pipe a single record through it to ensure that it doesn't throw any runtime exceptions when you use it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211719



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
##########
@@ -1393,6 +1409,11 @@ public void shouldPreserveSerdesForOperators() {
         assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).keySerde(), mySerde);
         assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).valueSerde(), mySerde);
 
+        assertEquals(((AbstractStream) stream1.repartition()).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.repartition()).valueSerde(), consumedInternal.valueSerde());
+        assertEquals(((AbstractStream) stream1.repartition(Repartitioned.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) stream1.repartition(Repartitioned.with(mySerde, mySerde))).valueSerde(), mySerde);

Review comment:
       replicating test cases

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
##########
@@ -1452,6 +1474,24 @@ public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
         assertNull(processorTopology.source("topic-1").getTimestampExtractor());
     }
 
+    @Test
+    public void shouldUseRecordMetadataTimestampExtractorWithRepartition() {

Review comment:
       replicating test

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
##########
@@ -1467,6 +1507,21 @@ public void shouldSendDataThroughTopicUsingProduced() {
         assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList(new KeyValueTimestamp<>("a", "b", 0))));
     }
 
+    @Test
+    public void shouldSendDataThroughRepartitionTopicUsingRepartitioned() {

Review comment:
       replicating test




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r427579474



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -815,9 +815,10 @@
      *
      * @param topic the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
-     * @see #repartition()
-     * @see #repartition(Repartitioned)
+     * @deprecated used {@link #repartition()} instead

Review comment:
       Not sure why? If I use 2.6 why do I can if it was deprecated in 2.4 or 2.2 or 2.6? It's deprecated in the version I use now. Why would I care about older versions?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211425



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -846,16 +847,13 @@
      * from the auto-generated topic using default serializers, deserializers, and producer's {@link DefaultPartitioner}.
      * The number of partitions is determined based on the upstream topics partition numbers.
      * <p>
-     * This operation is similar to {@link #through(String)}, however, Kafka Streams manages the used topic automatically.

Review comment:
       Not 100% sure if we should remove this now, or when we remove `through()`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r427621177



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -815,9 +815,10 @@
      *
      * @param topic the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
-     * @see #repartition()
-     * @see #repartition(Repartitioned)
+     * @deprecated used {@link #repartition()} instead

Review comment:
       For one thing, it's nice for _us_, so we can easily tell when it's been deprecated "long enough" to remove. I can recall trudging through git history in the past to figure this out.
   
   For users, maybe you don't care, but I personally find it nice when my libraries do this for me. It's just good bookkeeping, and it gives me some confidence that the maintainers are doing proper, tidy maintenance.
   
   If it provides a "third party" supporting opinion, the Scala language designers thought this was important enough to build it in as a separate field of the "deprecated" annotation: https://docs.scala-lang.org/tour/annotations.html




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211425



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -846,16 +847,13 @@
      * from the auto-generated topic using default serializers, deserializers, and producer's {@link DefaultPartitioner}.
      * The number of partitions is determined based on the upstream topics partition numbers.
      * <p>
-     * This operation is similar to {@link #through(String)}, however, Kafka Streams manages the used topic automatically.

Review comment:
       Not 100% sure if we should remove this now, of when we remove `through()`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211526



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/RepartitionedInternal.java
##########
@@ -21,33 +21,33 @@
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
 
-class RepartitionedInternal<K, V> extends Repartitioned<K, V> {
+public class RepartitionedInternal<K, V> extends Repartitioned<K, V> {

Review comment:
       Must be public to be visible in Scala




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211787



##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
##########
@@ -132,7 +132,8 @@ private KafkaStreams createKafkaStreams(final Properties props) {
             .to("sum", Produced.with(stringSerde, longSerde));
 
         if (withRepartitioning) {
-            final KStream<String, Integer> repartitionedData = data.through("repartition");
+            data.to("repartition");
+            final KStream<String, Integer> repartitionedData = builder.stream("repartition");

Review comment:
       As above. Avoid internal topics.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211555



##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -285,7 +286,28 @@ public void shouldProcessViaThroughTopic() {
         assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), sourceProcessorSupplier.theCapturedProcessor().processed);
         assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed);
     }
-    
+
+    @Test
+    public void shouldProcessViaRepartitionTopic() {

Review comment:
       Replicated the test for `through()` for `repartition()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r427623832



##########
File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
##########
@@ -232,10 +232,53 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param produced the instance of Produced that gives the serdes and `StreamPartitioner`
    * @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#through`
+   * @deprecated use `repartition()` instead
    */
+  @deprecated

Review comment:
       We should provide the arguments to the annotation to explain the deprecation. This gives better IDE integration than the ScalaDoc (which we won't need anymore):
   
   ```suggestion
     @deprecated("use `repartition()` instead", "2.6.0")
   ```

##########
File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
##########
@@ -232,10 +232,53 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param produced the instance of Produced that gives the serdes and `StreamPartitioner`
    * @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#through`
+   * @deprecated use `repartition()` instead
    */
+  @deprecated
   def through(topic: String)(implicit produced: Produced[K, V]): KStream[K, V] =
     new KStream(inner.through(topic, produced))
 
+  /**
+   * Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Repartitioned` instance
+   * for configuration of the `Serde key serde`, `Serde value serde`, `StreamPartitioner`, number of partitions, and
+   * topic name part.
+   * <p>
+   * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance.
+   * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.
+   * The topic will be named as "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
+   * `StreamsConfig` via parameter APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG`,
+   * "&lt;name&gt;" is either provided via `Repartitioned#as(String)` or an internally
+   * generated name, and "-repartition" is a fixed suffix.
+   * <p>
+   * The user can either supply the `Repartitioned` instance as an implicit in scope or she can also provide implicit
+   * key and value serdes that will be converted to a `Repartitioned` instance implicitly.
+   * <p>
+   * {{{
+   * Example:
+   *
+   * // brings implicit serdes in scope
+   * import Serdes._
+   *
+   * //..
+   * val clicksPerRegion: KStream[String, Long] = //..
+   *
+   * // Implicit serdes in scope will generate an implicit Produced instance, which
+   * // will be passed automatically to the call of through below
+   * clicksPerRegion.repartition
+   *
+   * // Similarly you can create an implicit Repartitioned and it will be passed implicitly
+   * // to the repartition call
+   * }}}
+   *
+   * @param repartitioned the `Repartitioned` instance used to specify `Serdes`, `StreamPartitioner`` which determines
+   *                      how records are distributed among partitions of the topic,
+   *                      part of the topic name, and number of partitions for a repartition topic.
+   * @return a [[KStream]] that contains the exact same repartitioned records as this [[KStream]]
+   * @see `org.apache.kafka.streams.kstream.KStream#repartition`
+   */
+  def repartition(implicit repartitioned: Repartitioned[K, V]): KStream[K, V] =

Review comment:
       I just noticed that we have no test for this operator (or for through). Should we add one?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r427621713



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
##########
@@ -254,6 +253,7 @@ private Topology getTopologyWithChangingValuesAfterChangingKey(final String opti
 
     }
 
+    @SuppressWarnings("deprecation") // specifically testing the deprecated variant

Review comment:
       This is exactly the point!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211408



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1082,7 +1081,7 @@ public void cleanUp() {
      * This will use the default Kafka Streams partitioner to locate the partition.
      * If a {@link StreamPartitioner custom partitioner} has been
      * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig} or
-     * {@link KStream#through(String, Produced)}, or if the original {@link KTable}'s input
+     * {@link KStream#repartition(Repartitioned)}, or if the original {@link KTable}'s input

Review comment:
       Not sure if this update is necessary. This method is deprecated itself.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211270



##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3679,58 +3670,6 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key
 <span class="c1">// Write the stream to the output topic, using explicit key and value serdes,</span>
 <span class="c1">// (thus overriding the defaults in the config properties).</span>
 <span class="n">stream</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;my-stream-output-topic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">());</span>
-</pre></div>
-                        </div>
-                        <p><strong>Causes data re-partitioning if any of the following conditions is true:</strong></p>
-                        <ol class="last arabic simple">
-                            <li>If the output topic has a different number of partitions than the stream/table.</li>
-                            <li>If the <code class="docutils literal"><span class="pre">KStream</span></code> was marked for re-partitioning.</li>
-                            <li>If you provide a custom <code class="docutils literal"><span class="pre">StreamPartitioner</span></code> to explicitly control how to distribute the output records
-                                across the partitions of the output topic.</li>
-                            <li>If the key of an output record is <code class="docutils literal"><span class="pre">null</span></code>.</li>
-                        </ol>
-                    </td>
-                </tr>
-                <tr class="row-odd"><td><p class="first"><strong>Through</strong></p>

Review comment:
       The diff is weird because the part above repeats below. The actual deletes starts here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211457



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -925,9 +920,8 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * Convert this stream to a {@link KTable}.
      * <p>
      * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
-     * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
-     * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
-     * {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
+     * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
+     * {@link #transform(TransformerSupplier, String...)}) an internal repartitioning topic will be created in Kafka.

Review comment:
       Just simplifying this one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211984



##########
File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
##########
@@ -218,7 +218,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * import Serdes._
    *
    * //..
-   * val clicksPerRegion: KTable[String, Long] = //..
+   * val clicksPerRegion: KStream[String, Long] = //..

Review comment:
       There is no `KTable#through()` method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426212066



##########
File path: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ProducedTest.scala
##########
@@ -37,15 +37,15 @@ class ProducedTest extends FlatSpec with Matchers {
     internalProduced.valueSerde.getClass shouldBe Serdes.Long.getClass
   }
 
-  "Create a Produced with timestampExtractor and resetPolicy" should "create a Consumed with Serdes, timestampExtractor and resetPolicy" in {
+  "Create a Produced with streamPartitioner" should "create a Produced with Serdes and streamPartitioner" in {

Review comment:
       Side cleanup (was originally copied from `ConsumedTest` but not updated correctly)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426214228



##########
File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.kstream.{Repartitioned => RepartitionedJ}
+import org.apache.kafka.streams.processor.StreamPartitioner
+
+object Repartitioned {
+
+  /**
+   * Create a Repartitioned instance with provided keySerde and valueSerde.
+   *
+   * @tparam K         key type
+   * @tparam V         value type
+   * @param keySerde    Serde to use for serializing the key
+   * @param valueSerde  Serde to use for serializing the value
+   * @return A new [[Repartitioned]] instance configured with keySerde and valueSerde
+   * @see KStream#repartition(Repartitioned)
+   */
+  def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): RepartitionedJ[K, V] =

Review comment:
       I just named all method `with` in alignment to the other Scala helper classes.
   
   Also noticed, that all helper classed only have static methods... Is not by design? Seems we are missing something here? If there is more than one optional parameter, it seems we should have non-static method to allow method chaining? (Could be fixed in a follow up PR)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#issuecomment-632759415


   Java 8 and Java 11 passed.
   Java 14:
   ```
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211688



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
##########
@@ -42,7 +42,7 @@
 
     private static final String TEST_ID = "reset-with-ssl-integration-test";
 
-    private static Map<String, Object> sslConfig;
+    private static final Map<String, Object> SSL_CONFIG;

Review comment:
       side cleanup

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
##########
@@ -588,6 +592,14 @@ public void shouldNotAllowNullTopicOnTo() {
         assertThat(exception.getMessage(), equalTo("topic can't be null"));
     }
 
+    @Test
+    public void shouldNotAllowNullRepartitionedOnRepartition() {

Review comment:
       replicating test




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r427581563



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
##########
@@ -254,6 +253,7 @@ private Topology getTopologyWithChangingValuesAfterChangingKey(final String opti
 
     }
 
+    @SuppressWarnings("deprecation") // specifically testing the deprecated variant

Review comment:
       Well, but then we need to add more suppression or deprecation upstream. Does not seem worth for testing code




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org