You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2021/05/27 09:32:22 UTC

[kafka] branch trunk updated: KAFKA-12796: Removal of deprecated classes under streams-scala (#10710)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a02b19c  KAFKA-12796: Removal of deprecated classes under streams-scala (#10710)
a02b19c is described below

commit a02b19cb77084a573a25be1b75fc195b9a9c1f9b
Author: Josep Prat <jo...@aiven.io>
AuthorDate: Thu May 27 11:30:15 2021 +0200

    KAFKA-12796: Removal of deprecated classes under streams-scala (#10710)
    
    Removes previously deprecated methods in older KIPs
    
    Reviewers: Bruno Cadonna <ca...@apache.org>
---
 gradle/spotbugs-exclude.xml                        |   6 -
 .../kafka/streams/scala/FunctionConversions.scala  |  83 -------------
 .../kafka/streams/scala/kstream/KTable.scala       |   2 +-
 .../kafka/streams/scala/kstream/Suppressed.scala   | 128 ---------------------
 .../streams/scala/kstream/SuppressedTest.scala     |  93 ---------------
 5 files changed, 1 insertion(+), 311 deletions(-)

diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index ab60dfd..a4e894c 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -423,12 +423,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
 
     <Match>
         <Package name="org.apache.kafka.streams.scala"/>
-        <Source name="FunctionConversions.scala"/>
-        <Bug pattern="EQ_UNUSUAL"/>
-    </Match>
-
-    <Match>
-        <Package name="org.apache.kafka.streams.scala"/>
         <Source name="FunctionsCompatConversions.scala"/>
         <Bug pattern="EQ_UNUSUAL"/>
     </Match>
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
deleted file mode 100644
index 892419a..0000000
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.scala
-
-import org.apache.kafka.streams.KeyValue
-import org.apache.kafka.streams.kstream._
-import scala.jdk.CollectionConverters._
-import java.lang.{Iterable => JIterable}
-
-@deprecated("This object is for internal use only", since = "2.1.0")
-object FunctionConversions {
-
-  implicit private[scala] class ForeachActionFromFunction[K, V](val p: (K, V) => Unit) extends AnyVal {
-    def asForeachAction: ForeachAction[K, V] = (key, value) => p(key, value)
-  }
-
-  implicit class PredicateFromFunction[K, V](val p: (K, V) => Boolean) extends AnyVal {
-    def asPredicate: Predicate[K, V] = (key: K, value: V) => p(key, value)
-  }
-
-  implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends AnyVal {
-    def asKeyValueMapper: KeyValueMapper[T, U, VR] = (key: T, value: U) => f(key, value)
-    def asValueJoiner: ValueJoiner[T, U, VR] = (value1: T, value2: U) => f(value1, value2)
-  }
-
-  implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f: (K, V) => (KR, VR)) extends AnyVal {
-    def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = (key: K, value: V) => {
-      val (kr, vr) = f(key, value)
-      KeyValue.pair(kr, vr)
-    }
-  }
-
-  implicit class ValueMapperFromFunction[V, VR](val f: V => VR) extends AnyVal {
-    def asValueMapper: ValueMapper[V, VR] = (value: V) => f(value)
-  }
-
-  implicit class FlatValueMapperFromFunction[V, VR](val f: V => Iterable[VR]) extends AnyVal {
-    def asValueMapper: ValueMapper[V, JIterable[VR]] = (value: V) => f(value).asJava
-  }
-
-  implicit class ValueMapperWithKeyFromFunction[K, V, VR](val f: (K, V) => VR) extends AnyVal {
-    def asValueMapperWithKey: ValueMapperWithKey[K, V, VR] = (readOnlyKey: K, value: V) => f(readOnlyKey, value)
-  }
-
-  implicit class FlatValueMapperWithKeyFromFunction[K, V, VR](val f: (K, V) => Iterable[VR]) extends AnyVal {
-    def asValueMapperWithKey: ValueMapperWithKey[K, V, JIterable[VR]] =
-      (readOnlyKey: K, value: V) => f(readOnlyKey, value).asJava
-  }
-
-  implicit class AggregatorFromFunction[K, V, VA](val f: (K, V, VA) => VA) extends AnyVal {
-    def asAggregator: Aggregator[K, V, VA] = (key: K, value: V, aggregate: VA) => f(key, value, aggregate)
-  }
-
-  implicit class MergerFromFunction[K, VR](val f: (K, VR, VR) => VR) extends AnyVal {
-    def asMerger: Merger[K, VR] = (aggKey: K, aggOne: VR, aggTwo: VR) => f(aggKey, aggOne, aggTwo)
-  }
-
-  implicit class ReducerFromFunction[V](val f: (V, V) => V) extends AnyVal {
-    def asReducer: Reducer[V] = (value1: V, value2: V) => f(value1, value2)
-  }
-
-  implicit class InitializerFromFunction[VA](val f: () => VA) extends AnyVal {
-    def asInitializer: Initializer[VA] = () => f()
-  }
-
-  implicit class TransformerSupplierFromFunction[K, V, VO](val f: () => Transformer[K, V, VO]) extends AnyVal {
-    def asTransformerSupplier: TransformerSupplier[K, V, VO] = () => f()
-  }
-}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index ccabeb7..7aba69d 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -304,7 +304,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
     new KStream(inner.toStream[KR](mapper.asKeyValueMapper, named))
 
   /**
-   * Suppress some updates from this changelog stream, determined by the supplied [[Suppressed]] configuration.
+   * Suppress some updates from this changelog stream, determined by the supplied [[org.apache.kafka.streams.kstream.Suppressed]] configuration.
    *
    * This controls what updates downstream table and stream operations will receive.
    *
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala
deleted file mode 100644
index 22b20b9..0000000
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.scala.kstream
-
-import java.time.Duration
-
-import org.apache.kafka.streams.kstream.{Windowed, Suppressed => SupressedJ}
-import org.apache.kafka.streams.kstream.Suppressed.{
-  EagerBufferConfig,
-  StrictBufferConfig,
-  BufferConfig => BufferConfigJ
-}
-import org.apache.kafka.streams.kstream.internals.suppress.{
-  EagerBufferConfigImpl,
-  FinalResultsSuppressionBuilder,
-  StrictBufferConfigImpl,
-  SuppressedInternal
-}
-
-/**
- * Duplicates the static factory methods inside the Java interface [[org.apache.kafka.streams.kstream.Suppressed]].
- *
- * This was required for compatibility w/ Scala 2.11 + Java 1.8 because the Scala 2.11 compiler doesn't support the use
- * of static methods inside Java interfaces. We have since dropped Scala 2.11 support.
- */
-@deprecated(message = "Use org.apache.kafka.streams.kstream.Suppressed", since = "2.5")
-object Suppressed {
-
-  /**
-   * Configure the suppression to emit only the "final results" from the window.
-   *
-   * By default all Streams operators emit results whenever new results are available.
-   * This includes windowed operations.
-   *
-   * This configuration will instead emit just one result per key for each window, guaranteeing
-   * to deliver only the final result. This option is suitable for use cases in which the business logic
-   * requires a hard guarantee that only the final result is propagated. For example, sending alerts.
-   *
-   * To accomplish this, the operator will buffer events from the window until the window close (that is,
-   * until the end-time passes, and additionally until the grace period expires). Since windowed operators
-   * are required to reject late events for a window whose grace period is expired, there is an additional
-   * guarantee that the final results emitted from this suppression will match any queriable state upstream.
-   *
-   * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
-   *                     This is required to be a "strict" config, since it would violate the "final results"
-   *                     property to emit early and then issue an update later.
-   * @tparam K The [[Windowed]] key type for the KTable to apply this suppression to.
-   * @return a "final results" mode suppression configuration
-   * @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]]
-   */
-  def untilWindowCloses[K](bufferConfig: StrictBufferConfig): SupressedJ[Windowed[K]] =
-    new FinalResultsSuppressionBuilder[Windowed[K]](null, bufferConfig)
-
-  /**
-   * Configure the suppression to wait `timeToWaitForMoreEvents` amount of time after receiving a record
-   * before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces
-   * the first record in the buffer but does <em>not</em> re-start the timer.
-   *
-   * @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events.
-   * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
-   * @tparam K The key type for the KTable to apply this suppression to.
-   * @return a suppression configuration
-   * @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]]
-   */
-  def untilTimeLimit[K](timeToWaitForMoreEvents: Duration, bufferConfig: BufferConfigJ[_]): SupressedJ[K] =
-    new SuppressedInternal[K](null, timeToWaitForMoreEvents, bufferConfig, null, false)
-
-  /**
-   * Duplicates the static factory methods inside the Java interface
-   * [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig]].
-   */
-  object BufferConfig {
-
-    /**
-     * Create a size-constrained buffer in terms of the maximum number of keys it will store.
-     *
-     * @param recordLimit maximum number of keys to buffer.
-     * @return size-constrained buffer in terms of the maximum number of keys it will store.
-     * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords]]
-     */
-    def maxRecords(recordLimit: Long): EagerBufferConfig =
-      new EagerBufferConfigImpl(recordLimit, Long.MaxValue)
-
-    /**
-     * Create a size-constrained buffer in terms of the maximum number of bytes it will use.
-     *
-     * @param byteLimit maximum number of bytes to buffer.
-     * @return size-constrained buffer in terms of the maximum number of bytes it will use.
-     * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes]]
-     */
-    def maxBytes(byteLimit: Long): EagerBufferConfig =
-      new EagerBufferConfigImpl(Long.MaxValue, byteLimit)
-
-    /**
-     * Create a buffer unconstrained by size (either keys or bytes).
-     *
-     * As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
-     *
-     * If there isn't enough heap available to meet the demand, the application will encounter an
-     * [[OutOfMemoryError]] and shut down (not guaranteed to be a graceful exit). Also, note that
-     * JVM processes under extreme memory pressure may exhibit poor GC behavior.
-     *
-     * This is a convenient option if you doubt that your buffer will be that large, but also don't
-     * wish to pick particular constraints, such as in testing.
-     *
-     * This buffer is "strict" in the sense that it will enforce the time bound or crash.
-     * It will never emit early.
-     *
-     * @return a buffer unconstrained by size (either keys or bytes).
-     * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded]]
-     */
-    def unbounded(): StrictBufferConfig = new StrictBufferConfigImpl()
-  }
-}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala
deleted file mode 100644
index 86936a2..0000000
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.scala.kstream
-
-import org.apache.kafka.streams.kstream.internals.suppress._
-import org.apache.kafka.streams.scala.kstream.Suppressed.BufferConfig
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-
-import java.time.Duration
-
-@deprecated(message = "org.apache.kafka.streams.scala.kstream.Suppressed has been deprecated", since = "2.5")
-class SuppressedTest {
-
-  @Test
-  def testProduceCorrectSuppressionUntilWindowCloses(): Unit = {
-    val bufferConfig = BufferConfig.unbounded()
-    val suppression = Suppressed.untilWindowCloses[String](bufferConfig)
-    assertEquals(suppression, new FinalResultsSuppressionBuilder(null, bufferConfig))
-    assertEquals(suppression.withName("soup"), new FinalResultsSuppressionBuilder("soup", bufferConfig))
-  }
-
-  @Test
-  def testProduceCorrectSuppressionUntilTimeLimit(): Unit = {
-    val bufferConfig = BufferConfig.unbounded()
-    val duration = Duration.ofMillis(1)
-    assertEquals(Suppressed.untilTimeLimit[String](duration, bufferConfig),
-                 new SuppressedInternal[String](null, duration, bufferConfig, null, false))
-  }
-
-  @Test
-  def testProduceCorrectBufferConfigWithMaxRecords(): Unit = {
-    assertEquals(BufferConfig.maxRecords(4), new EagerBufferConfigImpl(4, Long.MaxValue))
-    assertEquals(BufferConfig.maxRecords(4).withMaxBytes(5), new EagerBufferConfigImpl(4, 5))
-  }
-
-  @Test
-  def testProduceCorrectBufferConfigWithMaxBytes(): Unit = {
-    assertEquals(BufferConfig.maxBytes(4), new EagerBufferConfigImpl(Long.MaxValue, 4))
-    assertEquals(BufferConfig.maxBytes(4).withMaxRecords(5), new EagerBufferConfigImpl(5, 4))
-  }
-
-  @Test
-  def testProduceCorrectBufferConfigWithUnbounded(): Unit =
-    assertEquals(BufferConfig.unbounded(),
-                 new StrictBufferConfigImpl(Long.MaxValue, Long.MaxValue, BufferFullStrategy.SHUT_DOWN))
-
-  @Test
-  def testSupportLongChainsOfFactoryMethods(): Unit = {
-    val bc1 = BufferConfig
-      .unbounded()
-      .emitEarlyWhenFull()
-      .withMaxRecords(3L)
-      .withMaxBytes(4L)
-      .withMaxRecords(5L)
-      .withMaxBytes(6L)
-    assertEquals(new EagerBufferConfigImpl(5L, 6L), bc1)
-    assertEquals(new StrictBufferConfigImpl(5L, 6L, BufferFullStrategy.SHUT_DOWN), bc1.shutDownWhenFull())
-
-    val bc2 = BufferConfig
-      .maxBytes(4)
-      .withMaxRecords(5)
-      .withMaxBytes(6)
-      .withNoBound()
-      .withMaxBytes(7)
-      .withMaxRecords(8)
-
-    assertEquals(new StrictBufferConfigImpl(8L, 7L, BufferFullStrategy.SHUT_DOWN), bc2)
-    assertEquals(BufferConfig.unbounded(), bc2.withNoBound())
-
-    val bc3 = BufferConfig
-      .maxRecords(5L)
-      .withMaxBytes(10L)
-      .emitEarlyWhenFull()
-      .withMaxRecords(11L)
-
-    assertEquals(new EagerBufferConfigImpl(11L, 10L), bc3)
-  }
-}