You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2021/05/27 09:32:22 UTC
[kafka] branch trunk updated: KAFKA-12796: Removal of deprecated
classes under streams-scala (#10710)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a02b19c KAFKA-12796: Removal of deprecated classes under streams-scala (#10710)
a02b19c is described below
commit a02b19cb77084a573a25be1b75fc195b9a9c1f9b
Author: Josep Prat <jo...@aiven.io>
AuthorDate: Thu May 27 11:30:15 2021 +0200
KAFKA-12796: Removal of deprecated classes under streams-scala (#10710)
Removes previously deprecated methods in older KIPs
Reviewers: Bruno Cadonna <ca...@apache.org>
---
gradle/spotbugs-exclude.xml | 6 -
.../kafka/streams/scala/FunctionConversions.scala | 83 -------------
.../kafka/streams/scala/kstream/KTable.scala | 2 +-
.../kafka/streams/scala/kstream/Suppressed.scala | 128 ---------------------
.../streams/scala/kstream/SuppressedTest.scala | 93 ---------------
5 files changed, 1 insertion(+), 311 deletions(-)
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index ab60dfd..a4e894c 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -423,12 +423,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Match>
<Package name="org.apache.kafka.streams.scala"/>
- <Source name="FunctionConversions.scala"/>
- <Bug pattern="EQ_UNUSUAL"/>
- </Match>
-
- <Match>
- <Package name="org.apache.kafka.streams.scala"/>
<Source name="FunctionsCompatConversions.scala"/>
<Bug pattern="EQ_UNUSUAL"/>
</Match>
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
deleted file mode 100644
index 892419a..0000000
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.scala
-
-import org.apache.kafka.streams.KeyValue
-import org.apache.kafka.streams.kstream._
-import scala.jdk.CollectionConverters._
-import java.lang.{Iterable => JIterable}
-
-@deprecated("This object is for internal use only", since = "2.1.0")
-object FunctionConversions {
-
- implicit private[scala] class ForeachActionFromFunction[K, V](val p: (K, V) => Unit) extends AnyVal {
- def asForeachAction: ForeachAction[K, V] = (key, value) => p(key, value)
- }
-
- implicit class PredicateFromFunction[K, V](val p: (K, V) => Boolean) extends AnyVal {
- def asPredicate: Predicate[K, V] = (key: K, value: V) => p(key, value)
- }
-
- implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends AnyVal {
- def asKeyValueMapper: KeyValueMapper[T, U, VR] = (key: T, value: U) => f(key, value)
- def asValueJoiner: ValueJoiner[T, U, VR] = (value1: T, value2: U) => f(value1, value2)
- }
-
- implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f: (K, V) => (KR, VR)) extends AnyVal {
- def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = (key: K, value: V) => {
- val (kr, vr) = f(key, value)
- KeyValue.pair(kr, vr)
- }
- }
-
- implicit class ValueMapperFromFunction[V, VR](val f: V => VR) extends AnyVal {
- def asValueMapper: ValueMapper[V, VR] = (value: V) => f(value)
- }
-
- implicit class FlatValueMapperFromFunction[V, VR](val f: V => Iterable[VR]) extends AnyVal {
- def asValueMapper: ValueMapper[V, JIterable[VR]] = (value: V) => f(value).asJava
- }
-
- implicit class ValueMapperWithKeyFromFunction[K, V, VR](val f: (K, V) => VR) extends AnyVal {
- def asValueMapperWithKey: ValueMapperWithKey[K, V, VR] = (readOnlyKey: K, value: V) => f(readOnlyKey, value)
- }
-
- implicit class FlatValueMapperWithKeyFromFunction[K, V, VR](val f: (K, V) => Iterable[VR]) extends AnyVal {
- def asValueMapperWithKey: ValueMapperWithKey[K, V, JIterable[VR]] =
- (readOnlyKey: K, value: V) => f(readOnlyKey, value).asJava
- }
-
- implicit class AggregatorFromFunction[K, V, VA](val f: (K, V, VA) => VA) extends AnyVal {
- def asAggregator: Aggregator[K, V, VA] = (key: K, value: V, aggregate: VA) => f(key, value, aggregate)
- }
-
- implicit class MergerFromFunction[K, VR](val f: (K, VR, VR) => VR) extends AnyVal {
- def asMerger: Merger[K, VR] = (aggKey: K, aggOne: VR, aggTwo: VR) => f(aggKey, aggOne, aggTwo)
- }
-
- implicit class ReducerFromFunction[V](val f: (V, V) => V) extends AnyVal {
- def asReducer: Reducer[V] = (value1: V, value2: V) => f(value1, value2)
- }
-
- implicit class InitializerFromFunction[VA](val f: () => VA) extends AnyVal {
- def asInitializer: Initializer[VA] = () => f()
- }
-
- implicit class TransformerSupplierFromFunction[K, V, VO](val f: () => Transformer[K, V, VO]) extends AnyVal {
- def asTransformerSupplier: TransformerSupplier[K, V, VO] = () => f()
- }
-}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index ccabeb7..7aba69d 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -304,7 +304,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
new KStream(inner.toStream[KR](mapper.asKeyValueMapper, named))
/**
- * Suppress some updates from this changelog stream, determined by the supplied [[Suppressed]] configuration.
+ * Suppress some updates from this changelog stream, determined by the supplied [[org.apache.kafka.streams.kstream.Suppressed]] configuration.
*
* This controls what updates downstream table and stream operations will receive.
*
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala
deleted file mode 100644
index 22b20b9..0000000
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.scala.kstream
-
-import java.time.Duration
-
-import org.apache.kafka.streams.kstream.{Windowed, Suppressed => SupressedJ}
-import org.apache.kafka.streams.kstream.Suppressed.{
- EagerBufferConfig,
- StrictBufferConfig,
- BufferConfig => BufferConfigJ
-}
-import org.apache.kafka.streams.kstream.internals.suppress.{
- EagerBufferConfigImpl,
- FinalResultsSuppressionBuilder,
- StrictBufferConfigImpl,
- SuppressedInternal
-}
-
-/**
- * Duplicates the static factory methods inside the Java interface [[org.apache.kafka.streams.kstream.Suppressed]].
- *
- * This was required for compatibility w/ Scala 2.11 + Java 1.8 because the Scala 2.11 compiler doesn't support the use
- * of static methods inside Java interfaces. We have since dropped Scala 2.11 support.
- */
-@deprecated(message = "Use org.apache.kafka.streams.kstream.Suppressed", since = "2.5")
-object Suppressed {
-
- /**
- * Configure the suppression to emit only the "final results" from the window.
- *
- * By default all Streams operators emit results whenever new results are available.
- * This includes windowed operations.
- *
- * This configuration will instead emit just one result per key for each window, guaranteeing
- * to deliver only the final result. This option is suitable for use cases in which the business logic
- * requires a hard guarantee that only the final result is propagated. For example, sending alerts.
- *
- * To accomplish this, the operator will buffer events from the window until the window close (that is,
- * until the end-time passes, and additionally until the grace period expires). Since windowed operators
- * are required to reject late events for a window whose grace period is expired, there is an additional
- * guarantee that the final results emitted from this suppression will match any queriable state upstream.
- *
- * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
- * This is required to be a "strict" config, since it would violate the "final results"
- * property to emit early and then issue an update later.
- * @tparam K The [[Windowed]] key type for the KTable to apply this suppression to.
- * @return a "final results" mode suppression configuration
- * @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]]
- */
- def untilWindowCloses[K](bufferConfig: StrictBufferConfig): SupressedJ[Windowed[K]] =
- new FinalResultsSuppressionBuilder[Windowed[K]](null, bufferConfig)
-
- /**
- * Configure the suppression to wait `timeToWaitForMoreEvents` amount of time after receiving a record
- * before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces
- * the first record in the buffer but does <em>not</em> re-start the timer.
- *
- * @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events.
- * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
- * @tparam K The key type for the KTable to apply this suppression to.
- * @return a suppression configuration
- * @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]]
- */
- def untilTimeLimit[K](timeToWaitForMoreEvents: Duration, bufferConfig: BufferConfigJ[_]): SupressedJ[K] =
- new SuppressedInternal[K](null, timeToWaitForMoreEvents, bufferConfig, null, false)
-
- /**
- * Duplicates the static factory methods inside the Java interface
- * [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig]].
- */
- object BufferConfig {
-
- /**
- * Create a size-constrained buffer in terms of the maximum number of keys it will store.
- *
- * @param recordLimit maximum number of keys to buffer.
- * @return size-constrained buffer in terms of the maximum number of keys it will store.
- * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords]]
- */
- def maxRecords(recordLimit: Long): EagerBufferConfig =
- new EagerBufferConfigImpl(recordLimit, Long.MaxValue)
-
- /**
- * Create a size-constrained buffer in terms of the maximum number of bytes it will use.
- *
- * @param byteLimit maximum number of bytes to buffer.
- * @return size-constrained buffer in terms of the maximum number of bytes it will use.
- * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes]]
- */
- def maxBytes(byteLimit: Long): EagerBufferConfig =
- new EagerBufferConfigImpl(Long.MaxValue, byteLimit)
-
- /**
- * Create a buffer unconstrained by size (either keys or bytes).
- *
- * As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
- *
- * If there isn't enough heap available to meet the demand, the application will encounter an
- * [[OutOfMemoryError]] and shut down (not guaranteed to be a graceful exit). Also, note that
- * JVM processes under extreme memory pressure may exhibit poor GC behavior.
- *
- * This is a convenient option if you doubt that your buffer will be that large, but also don't
- * wish to pick particular constraints, such as in testing.
- *
- * This buffer is "strict" in the sense that it will enforce the time bound or crash.
- * It will never emit early.
- *
- * @return a buffer unconstrained by size (either keys or bytes).
- * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded]]
- */
- def unbounded(): StrictBufferConfig = new StrictBufferConfigImpl()
- }
-}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala
deleted file mode 100644
index 86936a2..0000000
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.scala.kstream
-
-import org.apache.kafka.streams.kstream.internals.suppress._
-import org.apache.kafka.streams.scala.kstream.Suppressed.BufferConfig
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-
-import java.time.Duration
-
-@deprecated(message = "org.apache.kafka.streams.scala.kstream.Suppressed has been deprecated", since = "2.5")
-class SuppressedTest {
-
- @Test
- def testProduceCorrectSuppressionUntilWindowCloses(): Unit = {
- val bufferConfig = BufferConfig.unbounded()
- val suppression = Suppressed.untilWindowCloses[String](bufferConfig)
- assertEquals(suppression, new FinalResultsSuppressionBuilder(null, bufferConfig))
- assertEquals(suppression.withName("soup"), new FinalResultsSuppressionBuilder("soup", bufferConfig))
- }
-
- @Test
- def testProduceCorrectSuppressionUntilTimeLimit(): Unit = {
- val bufferConfig = BufferConfig.unbounded()
- val duration = Duration.ofMillis(1)
- assertEquals(Suppressed.untilTimeLimit[String](duration, bufferConfig),
- new SuppressedInternal[String](null, duration, bufferConfig, null, false))
- }
-
- @Test
- def testProduceCorrectBufferConfigWithMaxRecords(): Unit = {
- assertEquals(BufferConfig.maxRecords(4), new EagerBufferConfigImpl(4, Long.MaxValue))
- assertEquals(BufferConfig.maxRecords(4).withMaxBytes(5), new EagerBufferConfigImpl(4, 5))
- }
-
- @Test
- def testProduceCorrectBufferConfigWithMaxBytes(): Unit = {
- assertEquals(BufferConfig.maxBytes(4), new EagerBufferConfigImpl(Long.MaxValue, 4))
- assertEquals(BufferConfig.maxBytes(4).withMaxRecords(5), new EagerBufferConfigImpl(5, 4))
- }
-
- @Test
- def testProduceCorrectBufferConfigWithUnbounded(): Unit =
- assertEquals(BufferConfig.unbounded(),
- new StrictBufferConfigImpl(Long.MaxValue, Long.MaxValue, BufferFullStrategy.SHUT_DOWN))
-
- @Test
- def testSupportLongChainsOfFactoryMethods(): Unit = {
- val bc1 = BufferConfig
- .unbounded()
- .emitEarlyWhenFull()
- .withMaxRecords(3L)
- .withMaxBytes(4L)
- .withMaxRecords(5L)
- .withMaxBytes(6L)
- assertEquals(new EagerBufferConfigImpl(5L, 6L), bc1)
- assertEquals(new StrictBufferConfigImpl(5L, 6L, BufferFullStrategy.SHUT_DOWN), bc1.shutDownWhenFull())
-
- val bc2 = BufferConfig
- .maxBytes(4)
- .withMaxRecords(5)
- .withMaxBytes(6)
- .withNoBound()
- .withMaxBytes(7)
- .withMaxRecords(8)
-
- assertEquals(new StrictBufferConfigImpl(8L, 7L, BufferFullStrategy.SHUT_DOWN), bc2)
- assertEquals(BufferConfig.unbounded(), bc2.withNoBound())
-
- val bc3 = BufferConfig
- .maxRecords(5L)
- .withMaxBytes(10L)
- .emitEarlyWhenFull()
- .withMaxRecords(11L)
-
- assertEquals(new EagerBufferConfigImpl(11L, 10L), bc3)
- }
-}