You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/07 16:44:50 UTC
[kafka] branch trunk updated: MINOR: Remove deprecated
KTable#writeAs, print, foreach, to, through (#4910)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 42771eb MINOR: Remove deprecated KTable#writeAs, print, foreach, to, through (#4910)
42771eb is described below
commit 42771eb37d6abd0a34220d30773e559680bdf9b0
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon May 7 09:44:45 2018 -0700
MINOR: Remove deprecated KTable#writeAs, print, foreach, to, through (#4910)
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../org/apache/kafka/streams/kstream/KTable.java | 640 ---------------------
.../streams/kstream/internals/KTableImpl.java | 258 ---------
.../KStreamAggregationDedupIntegrationTest.java | 6 +-
.../KStreamAggregationIntegrationTest.java | 9 +-
.../kstream/internals/KGroupedTableImplTest.java | 2 +-
.../kstream/internals/KTableFilterTest.java | 1 +
.../kstream/internals/KTableForeachTest.java | 111 ----
.../streams/kstream/internals/KTableImplTest.java | 109 ++--
.../kstream/internals/KTableMapValuesTest.java | 88 +--
9 files changed, 95 insertions(+), 1129 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 55555a5..1aaad1e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream;
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
@@ -24,9 +23,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
@@ -594,219 +591,6 @@ public interface KTable<K, V> {
final Serde<VR> valueSerde,
final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
-
- /**
- * Print the updated records of this {@code KTable} to {@code System.out}.
- * This function will use the generated name of the parent processor node to label the key/value pairs printed to
- * the console.
- * <p>
- * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
- * {@code toString()} on the deserialized object.
- * <p>
- * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
- * {@link Integer} etc. to get meaningful information.
- * <p>
- * Note that {@code print()} is not applied to the internal state store and only called for each new {@code KTable}
- * updated record.
- * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
- * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a {@link KStream} using {@link #toStream()} and then use
- * {@link KStream#print(Printed) print(Printed.toSysOut())} on the result.
- */
- @Deprecated
- void print();
-
- /**
- * Print the updated records of this {@code KTable} to {@code System.out}.
- * This function will use the given name to label the key/value pairs printed to the console.
- * <p>
- * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
- * {@code toString()} on the deserialized object.
- * <p>
- * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
- * {@link Integer} etc. to get meaningful information.
- * <p>
- * Note that {@code print()} is not applied to the internal state store and only called for each new {@code KTable}
- * updated record.
- *
- * @param label the name used to label the key/value pairs printed to the console
- * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
- * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a {@link KStream} using {@link #toStream()} and then use
- * {@link KStream#print(Printed) print(Printed.toSysOut().withLabel(lable))} on the result.
- */
- @Deprecated
- void print(final String label);
-
- /**
- * Print the updated records of this {@code KTable} to {@code System.out}.
- * This function will use the generated name of the parent processor node to label the key/value pairs printed to
- * the console.
- * <p>
- * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
- * {@code toString()} on the deserialized object.
- * <p>
- * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
- * {@link Integer} etc. to get meaningful information.
- * <p>
- * Note that {@code print()} is not applied to the internal state store and only called for each new {@code KTable}
- * updated record.
- *
- * @param keySerde key serde used to deserialize key if type is {@code byte[]},
- * @param valSerde value serde used to deserialize value if type is {@code byte[]}
- * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
- * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a {@link KStream} using {@link #toStream()} and then use
- * {@link KStream#print(Printed) print(Printed.toSysOut().withKeyValueMapper(...)} on the result.
- */
- @Deprecated
- void print(final Serde<K> keySerde,
- final Serde<V> valSerde);
-
- /**
- * Print the updated records of this {@code KTable} to {@code System.out}.
- * This function will use the given name to label the key/value pairs printed to the console.
- * <p>
- * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
- * {@code toString()} on the deserialized object.
- * <p>
- * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
- * {@link Integer} etc. to get meaningful information.
- * <p>
- * Note that {@code print()} is not applied to the internal state store and only called for each new {@code KTable}
- * updated record.
- *
- * @param keySerde key serde used to deserialize key if type is {@code byte[]},
- * @param valSerde value serde used to deserialize value if type is {@code byte[]},
- * @param label the name used to label the key/value pairs printed to the console
- * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
- * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a {@link KStream} using {@link #toStream()} and then use
- * {@link KStream#print(Printed) print(Printed.toSysOut().withLabel(label).withKeyValueMapper(...)} on the result.
- */
- @Deprecated
- void print(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String label);
-
- /**
- * Write the updated records of this {@code KTable} to a file at the given path.
- * This function will use the generated name of the parent processor node to label the key/value pairs printed to
- * the file.
- * <p>
- * The default serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
- * {@code toString()} on the deserialized object.
- * <p>
- * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
- * {@link Integer} etc. to get meaningful information.
- * <p>
- * Note that {@code writeAsText()} is not applied to the internal state store and only called for each new
- * {@code KTable} updated record.
- *
- * @param filePath name of file to write to
- * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
- * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a {@link KStream} using {@link #toStream()} and then use
- * {@link KStream#print(Printed) print(Printed.toFile(filePath)} on the result.
- */
- @Deprecated
- void writeAsText(final String filePath);
-
- /**
- * Write the updated records of this {@code KTable} to a file at the given path.
- * This function will use the given name to label the key/value printed to the file.
- * <p>
- * The default serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
- * {@code toString()} on the deserialized object.
- * <p>
- * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
- * {@link Integer} etc. to get meaningful information.
- * <p>
- * Note that {@code writeAsText()} is not applied to the internal state store and only called for each new
- * {@code KTable} updated record.
- *
- * @param filePath name of file to write to
- * @param label the name used to label the key/value pairs printed out to the console
- * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
- * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a {@link KStream} using {@link #toStream()} and then use
- * {@link KStream#print(Printed) print(Printed.toFile(filePath).withLabel(label)} on the result.
- */
- @Deprecated
- void writeAsText(final String filePath,
- final String label);
-
- /**
- * Write the updated records of this {@code KTable} to a file at the given path.
- * This function will use the generated name of the parent processor node to label the key/value pairs printed to
- * the file.
- * <p>
- * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
- * {@code toString()} on the deserialized object.
- * <p>
- * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
- * {@link Integer} etc. to get meaningful information.
- * <p>
- * Note that {@code writeAsText()} is not applied to the internal state store and only called for each new
- * {@code KTable} updated record.
- *
- * @param filePath name of file to write to
- * @param keySerde key serde used to deserialize key if type is {@code byte[]},
- * @param valSerde value serde used to deserialize value if type is {@code byte[]}
- * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
- * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a {@link KStream} using {@link #toStream()} and then use
- * {@link KStream#print(Printed) print(Printed.toFile(filePath).withKeyValueMapper(...)} on the result.
- */
- @Deprecated
- void writeAsText(final String filePath,
- final Serde<K> keySerde,
- final Serde<V> valSerde);
-
- /**
- * Write the updated records of this {@code KTable} to a file at the given path.
- * This function will use the given name to label the key/value printed to the file.
- * <p>
- * The default serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
- * {@code toString()} on the deserialized object.
- * <p>
- * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
- * {@link Integer} etc. to get meaningful information.
- * <p>
- * Note that {@code writeAsText()} is not applied to the internal state store and only called for each new
- * {@code KTable} updated record.
- *
- * @param filePath name of file to write to
- * @param label the name used to label the key/value pairs printed to the console
- * @param keySerde key serde used to deserialize key if type is {@code byte[]},
- * @param valSerde value serde used to deserialize value if type is {@code byte[]}
- * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
- * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a {@link KStream} using {@link #toStream()} and then use
- * {@link KStream#print(Printed) print(Printed.toFile(filePath).withLabel(label).withKeyValueMapper(...)} on the result.
- */
- @Deprecated
- void writeAsText(final String filePath,
- final String label,
- final Serde<K> keySerde,
- final Serde<V> valSerde);
-
- /**
- * Perform an action on each updated record of this {@code KTable}.
- * Note that this is a terminal operation that returns void.
- * <p>
- * Note that {@code foreach()} is not applied to the internal state store and only called for each new
- * {@code KTable} updated record.
- *
- * @param action an action to perform on each record
- * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
- * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a {@link KStream} using {@link #toStream()} and then use
- * {@link KStream#foreach(ForeachAction) foreach(action)} on the result.
- */
- @Deprecated
- void foreach(final ForeachAction<? super K, ? super V> action);
-
/**
* Convert this changelog stream to a {@link KStream}.
* <p>
@@ -845,430 +629,6 @@ public interface KTable<K, V> {
<KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper);
/**
- * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
- * serializers and deserializers and producer's {@link DefaultPartitioner}.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- * <p>
- * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
- * <p>
- * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link KStreamBuilder#table(String, String)})
- * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
- *
- * @param topic the topic name
- * @param queryableStoreName the state store name used for the result {@code KTable}; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KTable#through(String)}
- * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} and
- * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
- * to read back as a {@code KTable}
- */
- @Deprecated
- KTable<K, V> through(final String topic,
- final String queryableStoreName);
-
- /**
- * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
- * serializers and deserializers and producer's {@link DefaultPartitioner}.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- * <p>
- * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
- * <p>
- * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link KStreamBuilder#table(String, String)})
- * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
- *
- * @param topic the topic name
- * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
- * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} and
- * and {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier))}
- * to read back as a {@code KTable}
- */
- @Deprecated
- KTable<K, V> through(final String topic,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
-
- /**
- * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
- * serializers and deserializers and producer's {@link DefaultPartitioner}.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- * <p>
- * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
- * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
- * <p>
- * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
- * {@link KStreamBuilder#table(String)})
- *
- * @param topic the topic name
- * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} and
- * and {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
- */
- @Deprecated
- KTable<K, V> through(final String topic);
-
- /**
- * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
- * serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of
- * records to partitions.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- * <p>
- * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
- * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
- * <p>
- * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
- * {@link KStreamBuilder#table(String)})
- *
- * @param partitioner the function used to determine how records are distributed among partitions of the topic,
- * if not specified producer's {@link DefaultPartitioner} will be used
- * @param topic the topic name
- * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by
- * {@link KStream#to(String, Produced) to(topic, Produced.streamPartitioner(partitioner))} and
- * {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
- */
- @Deprecated
- KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic);
-
- /**
- * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
- * serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of
- * records to partitions.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- * <p>
- * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
- * <p>
- * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link KStreamBuilder#table(String, String)})
- *
- * @param partitioner the function used to determine how records are distributed among partitions of the topic,
- * if not specified producer's {@link DefaultPartitioner} will be used
- * @param topic the topic name
- * @param queryableStoreName the state store name used for the result {@code KTable}.
- * If {@code null} this is the equivalent of {@link KTable#through(StreamPartitioner, String)}
- * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by
- * {@link KStream#to(String, Produced) to(topic, Produced.streamPartitioner(partitioner))} and
- * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
- * to read back as a {@code KTable}
- */
- @Deprecated
- KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic,
- final String queryableStoreName);
-
- /**
- * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
- * serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of
- * records to partitions.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- * <p>
- * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
- * <p>
- * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link KStreamBuilder#table(String, String)})
- *
- * @param partitioner the function used to determine how records are distributed among partitions of the topic,
- * if not specified producer's {@link DefaultPartitioner} will be used
- * @param topic the topic name
- * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
- * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by
- * {@link KStream#to(String, Produced) to(topic, Produced.streamPartitioner(partitioner))} and
- * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier)}
- * to read back as a {@code KTable}
- */
- @Deprecated
- KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
-
- /**
- * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- * <p>
- * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is
- * used—otherwise producer's {@link DefaultPartitioner} is used.
- * <p>
- * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
- * {@link KStreamBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
- * <p>
- * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link KStreamBuilder#table(String, String)})
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name
- * @param queryableStoreName the state store name used for the result {@code KTable}.
- * If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, String)}
- * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by
- * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde))} and
- * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
- * to read back as a {@code KTable}
- */
- @Deprecated
- KTable<K, V> through(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic,
- final String queryableStoreName);
-
- /**
- * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- * <p>
- * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is
- * used—otherwise producer's {@link DefaultPartitioner} is used.
- * <p>
- * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
- * <p>
- * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link StreamsBuilder#table(String, Materialized)})
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name
- * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
- * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by
- * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde))} and
- * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier)}
- * to read back as a {@code KTable}
- */
- @Deprecated
- KTable<K, V> through(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
-
- /**
- * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- * <p>
- * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is
- * used—otherwise producer's {@link DefaultPartitioner} is used.
- * <p>
- * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
- * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
- * <p>
- * The resulting {@code KTable} will be materialized in a local state store with an interna; store name (cf.
- * {@link KStreamBuilder#table(String)})
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name
- * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by
- * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde))}
- * and {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
- */
- @Deprecated
- KTable<K, V> through(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic);
-
- /**
- * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
- * {@link StreamPartitioner} to determine the distribution of records to partitions.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- * <p>
- * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
- * #to(keySerde, valueSerde, partitioner, someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
- * <p>
- * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link KStreamBuilder#table(String, String)})
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param partitioner the function used to determine how records are distributed among partitions of the topic,
- * if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
- * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will
- * be used
- * @param topic the topic name
- * @param queryableStoreName the state store name used for the result {@code KTable}.
- * If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, StreamPartitioner, String)()}
- * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by
- * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner))} and
- * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
- * to read back as a {@code KTable}
- */
- @Deprecated
- KTable<K, V> through(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic,
- final String queryableStoreName);
-
- /**
- * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
- * {@link StreamPartitioner} to determine the distribution of records to partitions.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- * <p>
- * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
- * #to(keySerde, valueSerde, partitioner, someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
- * <p>
- * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link KStreamBuilder#table(String, String)})
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param partitioner the function used to determine how records are distributed among partitions of the topic,
- * if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
- * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will
- * be used
- * @param topic the topic name
- * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
- * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by
- * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner))} and
- * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier))}
- * to read back as a {@code KTable}
- */
- @Deprecated
- KTable<K, V> through(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
-
- /**
- * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
- * {@link StreamPartitioner} to determine the distribution of records to partitions.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- * <p>
- * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
- * #to(keySerde, valueSerde, partitioner, someTopicName)} and
- * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
- * <p>
- * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
- * {@link KStreamBuilder#table(String)})
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param partitioner the function used to determine how records are distributed among partitions of the topic,
- * if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
- * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will
- * be used
- * @param topic the topic name
- * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by
- * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner))} and
- * {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
- */
- @Deprecated
- KTable<K, V> through(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic);
-
- /**
- * Materialize this changelog stream to a topic using default serializers and deserializers and producer's
- * {@link DefaultPartitioner}.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- *
- * @param topic the topic name
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)}
- */
- @Deprecated
- void to(final String topic);
-
- /**
- * Materialize this changelog stream to a topic using default serializers and deserializers and a customizable
- * {@link StreamPartitioner} to determine the distribution of records to partitions.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- *
- * @param partitioner the function used to determine how records are distributed among partitions of the topic,
- * if not specified producer's {@link DefaultPartitioner} will be used
- * @param topic the topic name
- * @deprecated use {@link #toStream()} followed by
- * {@link KStream#to(String, Produced) to(topic, Produced.withStreamPartitioner(partitioner)}
- */
- @Deprecated
- void to(final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic);
-
- /**
- * Materialize this changelog stream to a topic.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- * <p>
- * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is
- * used—otherwise producer's {@link DefaultPartitioner} is used.
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name
- * @deprecated use {@link #toStream()} followed by
- * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde)}
- */
- @Deprecated
- void to(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic);
-
- /**
- * Materialize this changelog stream to a topic using a customizable {@link StreamPartitioner} to determine the
- * distribution of records to partitions.
- * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
- * started).
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param partitioner the function used to determine how records are distributed among partitions of the topic,
- * if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
- * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will
- * be used
- * @param topic the topic name
- * @deprecated use {@link #toStream()} followed by
- * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partioner)}
- */
- @Deprecated
- void to(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic);
-
- /**
* Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper} and default serializers
* and deserializers.
* Each {@link KeyValue} pair of this {@code KTable} is mapped to a new {@link KeyValue} pair by applying the
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 11b8c51..785f73a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -18,10 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.TopologyException;
-import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
@@ -32,14 +29,10 @@ import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.util.Objects;
import java.util.Set;
@@ -59,8 +52,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
private static final String FILTER_NAME = "KTABLE-FILTER-";
- private static final String FOREACH_NAME = "KTABLE-FOREACH-";
-
private static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
private static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
@@ -69,16 +60,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
private static final String MERGE_NAME = "KTABLE-MERGE-";
- private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
-
private static final String SELECT_NAME = "KTABLE-SELECT-";
private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
private final ProcessorSupplier<?, ?> processorSupplier;
- private final KeyValueMapper<K, V, String> defaultKeyValueMapper;
-
private final String queryableStoreName;
private final boolean isQueryable;
@@ -98,12 +85,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
this.keySerde = null;
this.valSerde = null;
this.isQueryable = isQueryable;
- this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() {
- @Override
- public String apply(K key, V value) {
- return String.format("%s, %s", key, value);
- }
- };
}
public KTableImpl(final InternalStreamsBuilder builder,
@@ -120,12 +101,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
this.keySerde = keySerde;
this.valSerde = valSerde;
this.isQueryable = isQueryable;
- this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() {
- @Override
- public String apply(K key, V value) {
- return String.format("%s, %s", key, value);
- }
- };
}
@Override
@@ -318,239 +293,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return doMapValues(withKey(mapper), valueSerde, storeSupplier);
}
- @SuppressWarnings("deprecation")
- @Override
- public void print() {
- print(null, null, this.name);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void print(final String label) {
- print(null, null, label);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void print(final Serde<K> keySerde,
- final Serde<V> valSerde) {
- print(keySerde, valSerde, this.name);
- }
-
- @SuppressWarnings({"unchecked", "deprecation"})
- @Override
- public void print(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String label) {
- Objects.requireNonNull(label, "label can't be null");
- final String name = builder.newProcessorName(PRINTING_NAME);
- builder.internalTopologyBuilder.addProcessor(
- name,
- new KStreamPrint<>(new PrintForeachAction<>(System.out, defaultKeyValueMapper, label)),
- this.name);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void writeAsText(final String filePath) {
- writeAsText(filePath, this.name, null, null);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void writeAsText(final String filePath,
- final String label) {
- writeAsText(filePath, label, null, null);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void writeAsText(final String filePath,
- final Serde<K> keySerde,
- final Serde<V> valSerde) {
- writeAsText(filePath, this.name, keySerde, valSerde);
- }
-
- /**
- * @throws TopologyException if file is not found
- */
- @SuppressWarnings({"unchecked", "deprecation"})
- @Override
- public void writeAsText(final String filePath,
- final String label,
- final Serde<K> keySerde,
- final Serde<V> valSerde) {
- Objects.requireNonNull(filePath, "filePath can't be null");
- Objects.requireNonNull(label, "label can't be null");
- if (filePath.trim().isEmpty()) {
- throw new TopologyException("filePath can't be an empty string");
- }
- final String name = builder.newProcessorName(PRINTING_NAME);
- try {
- builder.internalTopologyBuilder.addProcessor(
- name,
- new KStreamPrint<>(new PrintForeachAction<>(new FileOutputStream(filePath), defaultKeyValueMapper, label)),
- this.name);
- } catch (final FileNotFoundException e) {
- throw new TopologyException(String.format("Unable to write stream to file at [%s] %s", filePath, e.getMessage()));
- }
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void foreach(final ForeachAction<? super K, ? super V> action) {
- Objects.requireNonNull(action, "action can't be null");
- String name = builder.newProcessorName(FOREACH_NAME);
- KStreamPeek<K, Change<V>> processorSupplier = new KStreamPeek<>(new ForeachAction<K, Change<V>>() {
- @Override
- public void apply(K key, Change<V> value) {
- action.apply(key, value.newValue);
- }
- }, false);
- builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KTable<K, V> through(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic,
- final String queryableStoreName) {
- to(keySerde, valSerde, partitioner, topic);
-
- return builder.table(topic,
- new ConsumedInternal<>(keySerde, valSerde, new FailOnInvalidTimestamp(), null),
- new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(keySerde, valSerde),
- builder,
- KTableImpl.TOSTREAM_NAME));
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KTable<K, V> through(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
- Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
- to(keySerde, valSerde, partitioner, topic);
-
- final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(Consumed.with(keySerde, valSerde, new FailOnInvalidTimestamp(), null));
- return builder.table(topic, consumed, storeSupplier);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KTable<K, V> through(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic) {
- return through(keySerde, valSerde, partitioner, topic, (String) null);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KTable<K, V> through(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic,
- final String queryableStoreName) {
- return through(keySerde, valSerde, null, topic, queryableStoreName);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KTable<K, V> through(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
- Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
- return through(keySerde, valSerde, null, topic, storeSupplier);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KTable<K, V> through(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic) {
- return through(keySerde, valSerde, null, topic, (String) null);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic,
- final String queryableStoreName) {
- return through(null, null, partitioner, topic, queryableStoreName);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
- Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
- return through(null, null, partitioner, topic, storeSupplier);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic) {
- return through(null, null, partitioner, topic, (String) null);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KTable<K, V> through(final String topic,
- final String queryableStoreName) {
- return through(null, null, null, topic, queryableStoreName);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KTable<K, V> through(final String topic,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
- Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
- return through(null, null, null, topic, storeSupplier);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KTable<K, V> through(final String topic) {
- return through(null, null, null, topic, (String) null);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void to(final String topic) {
- to(null, null, null, topic);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void to(final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic) {
- to(null, null, partitioner, topic);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void to(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic) {
- this.toStream().to(keySerde, valSerde, null, topic);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void to(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final StreamPartitioner<? super K, ? super V> partitioner,
- final String topic) {
- this.toStream().to(keySerde, valSerde, partitioner, topic);
- }
-
@Override
public KStream<K, V> toStream() {
String name = builder.newProcessorName(TOSTREAM_NAME);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 44e139a..51bbb95 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -126,9 +126,9 @@ public class KStreamAggregationDedupIntegrationTest {
@Test
public void shouldReduce() throws Exception {
produceMessages(System.currentTimeMillis());
- groupedStream
- .reduce(reducer, "reduce-by-key")
- .to(Serdes.String(), Serdes.String(), outputTopic);
+ groupedStream.reduce(reducer, "reduce-by-key")
+ .toStream()
+ .to(Serdes.String(), Serdes.String(), outputTopic);
startStreams();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 52b9ee8..2efe9f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -168,6 +168,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(mockTime.milliseconds());
groupedStream
.reduce(reducer, "reduce-by-key")
+ .toStream()
.to(Serdes.String(), Serdes.String(), outputTopic);
startStreams();
@@ -294,6 +295,7 @@ public class KStreamAggregationIntegrationTest {
aggregator,
Serdes.Integer(),
"aggregate-by-selected-key")
+ .toStream()
.to(Serdes.String(), Serdes.Integer(), outputTopic);
startStreams();
@@ -445,7 +447,8 @@ public class KStreamAggregationIntegrationTest {
produceMessages(mockTime.milliseconds());
groupedStream.count("count-by-key")
- .to(Serdes.String(), Serdes.Long(), outputTopic);
+ .toStream()
+ .to(Serdes.String(), Serdes.Long(), outputTopic);
shouldCountHelper();
}
@@ -456,7 +459,8 @@ public class KStreamAggregationIntegrationTest {
produceMessages(mockTime.milliseconds());
groupedStream.count()
- .to(Serdes.String(), Serdes.Long(), outputTopic);
+ .toStream()
+ .to(Serdes.String(), Serdes.Long(), outputTopic);
shouldCountHelper();
}
@@ -667,6 +671,7 @@ public class KStreamAggregationIntegrationTest {
return value1 + ":" + value2;
}
}, SessionWindows.with(sessionGap).until(maintainMillis), userSessionsStore)
+ .toStream()
.foreach(new ForeachAction<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String value) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 05d339f..b614732 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -124,7 +124,7 @@ public class KGroupedTableImplTest {
private Map<String, Integer> getReducedResults(final KTable<String, Integer> inputKTable) {
final Map<String, Integer> reducedResults = new HashMap<>();
- inputKTable.foreach(new ForeachAction<String, Integer>() {
+ inputKTable.toStream().foreach(new ForeachAction<String, Integer>() {
@Override
public void apply(final String key, final Integer value) {
reducedResults.put(key, value);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index bde771b..2eecbc1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -506,6 +506,7 @@ public class KTableFilterTest {
.<Integer, String>table("empty")
.filter(numberKeyPredicate)
.filterNot(numberKeyPredicate)
+ .toStream()
.to("nirvana");
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
deleted file mode 100644
index a6b6c64..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.Consumed;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.test.ConsumerRecordFactory;
-import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Locale;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
-@Deprecated
-public class KTableForeachTest {
-
- final private String topicName = "topic";
- private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
- private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
-
- @Test
- public void testForeach() {
- // Given
- List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
- new KeyValue<>(0, "zero"),
- new KeyValue<>(1, "one"),
- new KeyValue<>(2, "two"),
- new KeyValue<>(3, "three")
- );
-
- List<KeyValue<Integer, String>> expectedRecords = Arrays.asList(
- new KeyValue<>(0, "ZERO"),
- new KeyValue<>(2, "ONE"),
- new KeyValue<>(4, "TWO"),
- new KeyValue<>(6, "THREE")
- );
-
- final List<KeyValue<Integer, String>> actualRecords = new ArrayList<>();
- ForeachAction<Integer, String> action =
- new ForeachAction<Integer, String>() {
- @Override
- public void apply(Integer key, String value) {
- actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase(Locale.ROOT)));
- }
- };
-
- // When
- StreamsBuilder builder = new StreamsBuilder();
- KTable<Integer, String> table = builder.table(topicName,
- Consumed.with(Serdes.Integer(), Serdes.String()),
- Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(topicName)
- .withKeySerde(Serdes.Integer())
- .withValueSerde(Serdes.String()));
- table.foreach(action);
-
- // Then
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- for (KeyValue<Integer, String> record : inputRecords) {
- driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
- }
- }
-
- assertEquals(expectedRecords.size(), actualRecords.size());
- for (int i = 0; i < expectedRecords.size(); i++) {
- KeyValue<Integer, String> expectedRecord = expectedRecords.get(i);
- KeyValue<Integer, String> actualRecord = actualRecords.get(i);
- assertEquals(expectedRecord, actualRecord);
- }
- }
-
- @Test
- public void testTypeVariance() {
- ForeachAction<Number, Object> consume = new ForeachAction<Number, Object>() {
- @Override
- public void apply(Number key, Object value) {}
- };
-
- new StreamsBuilder()
- .<Integer, String>table("emptyTopic")
- .foreach(consume);
- }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index ae1e285..fcdd0a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -22,10 +22,10 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
@@ -57,8 +57,10 @@ import static org.junit.Assert.assertTrue;
public class KTableImplTest {
- final private Serde<String> stringSerde = Serdes.String();
+ private final Serde<String> stringSerde = Serdes.String();
private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
+ private final Produced<String, String> produced = Produced.with(stringSerde, stringSerde);
+
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
private File stateDir = null;
@@ -78,14 +80,13 @@ public class KTableImplTest {
String topic1 = "topic1";
String topic2 = "topic2";
- String storeName2 = "storeName2";
- KTable<String, String> table1 = builder.table(topic1, consumed);
+ final KTable<String, String> table1 = builder.table(topic1, consumed);
- MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
table1.toStream().process(supplier);
- KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
+ final KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
@Override
public Integer apply(String value) {
return new Integer(value);
@@ -94,7 +95,7 @@ public class KTableImplTest {
table2.toStream().process(supplier);
- KTable<String, Integer> table3 = table2.filter(new Predicate<String, Integer>() {
+ final KTable<String, Integer> table3 = table2.filter(new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return (value % 2) == 0;
@@ -103,7 +104,8 @@ public class KTableImplTest {
table3.toStream().process(supplier);
- KTable<String, String> table4 = table1.through(stringSerde, stringSerde, topic2, storeName2);
+ table1.toStream().to(topic2, produced);
+ final KTable<String, String> table4 = builder.table(topic2, consumed);
table4.toStream().process(supplier);
@@ -130,46 +132,46 @@ public class KTableImplTest {
public void testValueGetter() {
final StreamsBuilder builder = new StreamsBuilder();
- String topic1 = "topic1";
- String topic2 = "topic2";
- String storeName2 = "storeName2";
+ final String topic1 = "topic1";
+ final String topic2 = "topic2";
- KTableImpl<String, String, String> table1 =
+ final KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(topic1, consumed);
- KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+ final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
public Integer apply(String value) {
return new Integer(value);
}
});
- KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+ final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return (value % 2) == 0;
}
});
- KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
- table1.through(stringSerde, stringSerde, topic2, storeName2);
- KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
- KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
- KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
- KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+ table1.toStream().to(topic2, produced);
+ final KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) builder.table(topic2, consumed);
+
+ final KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+ final KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+ final KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+ final KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
driver.setUp(builder, stateDir, null, null);
// two state store should be created
assertEquals(2, driver.allStateStores().size());
- KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+ final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
getter1.init(driver.context());
- KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+ final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
getter2.init(driver.context());
- KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+ final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
getter3.init(driver.context());
- KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+ final KTableValueGetter<String, String> getter4 = getterSupplier4.get();
getter4.init(driver.context());
driver.process(topic1, "A", "01");
@@ -255,16 +257,16 @@ public class KTableImplTest {
@Test
public void testStateStoreLazyEval() {
- String topic1 = "topic1";
- String topic2 = "topic2";
+ final String topic1 = "topic1";
+ final String topic2 = "topic2";
final StreamsBuilder builder = new StreamsBuilder();
- KTableImpl<String, String, String> table1 =
+ final KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(topic1, consumed);
builder.table(topic2, consumed);
- KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
+ final KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
public Integer apply(String value) {
@@ -288,24 +290,24 @@ public class KTableImplTest {
@Test
public void testStateStore() {
- String topic1 = "topic1";
- String topic2 = "topic2";
+ final String topic1 = "topic1";
+ final String topic2 = "topic2";
final StreamsBuilder builder = new StreamsBuilder();
- KTableImpl<String, String, String> table1 =
+ final KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(topic1, consumed);
- KTableImpl<String, String, String> table2 =
+ final KTableImpl<String, String, String> table2 =
(KTableImpl<String, String, String>) builder.table(topic2, consumed);
- KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
+ final KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
public Integer apply(String value) {
return new Integer(value);
}
});
- KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
+ final KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
@@ -329,12 +331,12 @@ public class KTableImplTest {
@Test
public void testRepartition() throws NoSuchFieldException, IllegalAccessException {
- String topic1 = "topic1";
- String storeName1 = "storeName1";
+ final String topic1 = "topic1";
+ final String storeName1 = "storeName1";
final StreamsBuilder builder = new StreamsBuilder();
- KTableImpl<String, String, String> table1 =
+ final KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(topic1,
consumed,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName1)
@@ -378,11 +380,6 @@ public class KTableImplTest {
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullTopicOnTo() {
- table.to(null);
- }
-
- @Test(expected = NullPointerException.class)
public void shouldNotAllowNullPredicateOnFilter() {
table.filter(null);
}
@@ -402,34 +399,6 @@ public class KTableImplTest {
table.mapValues((ValueMapperWithKey) null);
}
- @SuppressWarnings("deprecation")
- @Test(expected = NullPointerException.class)
- public void shouldNotAllowNullFilePathOnWriteAsText() {
- table.writeAsText(null);
- }
-
- @SuppressWarnings("deprecation")
- @Test(expected = TopologyException.class)
- public void shouldNotAllowEmptyFilePathOnWriteAsText() {
- table.writeAsText("\t \t");
- }
-
- @SuppressWarnings("deprecation")
- @Test(expected = NullPointerException.class)
- public void shouldNotAllowNullActionOnForEach() {
- table.foreach(null);
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldAllowNullTopicInThrough() {
- table.through((String) null, "store");
- }
-
- @Test
- public void shouldAllowNullStoreInThrough() {
- table.through("topic", (String) null);
- }
-
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullSelectorOnGroupBy() {
table.groupBy(null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 3cd7701..c54efd8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
@@ -44,8 +45,9 @@ import static org.junit.Assert.assertTrue;
public class KTableMapValuesTest {
- final private Serde<String> stringSerde = Serdes.String();
+ private final Serde<String> stringSerde = Serdes.String();
private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
+ private final Produced<String, String> produced = Produced.with(stringSerde, stringSerde);
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
private File stateDir = null;
@@ -70,17 +72,17 @@ public class KTableMapValuesTest {
public void testKTable() {
final StreamsBuilder builder = new StreamsBuilder();
- String topic1 = "topic1";
+ final String topic1 = "topic1";
- KTable<String, String> table1 = builder.table(topic1, consumed);
- KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
+ final KTable<String, String> table1 = builder.table(topic1, consumed);
+ final KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
@Override
public Integer apply(CharSequence value) {
return value.charAt(0) - 48;
}
});
- MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
table2.toStream().process(supplier);
doTestKTable(builder, topic1, supplier);
@@ -90,17 +92,17 @@ public class KTableMapValuesTest {
public void testQueryableKTable() {
final StreamsBuilder builder = new StreamsBuilder();
- String topic1 = "topic1";
+ final String topic1 = "topic1";
- KTable<String, String> table1 = builder.table(topic1, consumed);
- KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
+ final KTable<String, String> table1 = builder.table(topic1, consumed);
+ final KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
@Override
public Integer apply(CharSequence value) {
return value.charAt(0) - 48;
}
}, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyName").withValueSerde(Serdes.Integer()));
- MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
table2.toStream().process(supplier);
doTestKTable(builder, topic1, supplier);
@@ -112,19 +114,19 @@ public class KTableMapValuesTest {
final KTableImpl<String, String, Integer> table2,
final KTableImpl<String, Integer, Integer> table3,
final KTableImpl<String, String, String> table4) {
- KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
- KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
- KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
- KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+ final KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+ final KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+ final KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+ final KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
- KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+ final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
getter1.init(driver.context());
- KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+ final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
getter2.init(driver.context());
- KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+ final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
getter3.init(driver.context());
- KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+ final KTableValueGetter<String, String> getter4 = getterSupplier4.get();
getter4.init(driver.context());
driver.process(topic1, "A", "01");
@@ -209,73 +211,71 @@ public class KTableMapValuesTest {
@Test
public void testValueGetter() {
- StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
- String topic1 = "topic1";
- String topic2 = "topic2";
- String storeName2 = "storeName2";
+ final String topic1 = "topic1";
+ final String topic2 = "topic2";
- KTableImpl<String, String, String> table1 =
+ final KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(topic1, consumed);
- KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+ final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
public Integer apply(String value) {
return new Integer(value);
}
});
- KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+ final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return (value % 2) == 0;
}
});
- KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
- table1.through(stringSerde, stringSerde, topic2, storeName2);
+ table1.toStream().to(topic2, produced);
+ final KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) builder.table(topic2, consumed);
doTestValueGetter(builder, topic1, table1, table2, table3, table4);
}
@Test
public void testQueryableValueGetter() {
- StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
- String topic1 = "topic1";
- String topic2 = "topic2";
- String storeName2 = "storeName2";
+ final String topic1 = "topic1";
+ final String topic2 = "topic2";
- KTableImpl<String, String, String> table1 =
+ final KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(topic1, consumed);
- KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+ final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
public Integer apply(String value) {
return new Integer(value);
}
}, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyMapName").withValueSerde(Serdes.Integer()));
- KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+ final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return (value % 2) == 0;
}
}, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyFilterName").withValueSerde(Serdes.Integer()));
- KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
- table1.through(stringSerde, stringSerde, topic2, storeName2);
+ table1.toStream().to(topic2, produced);
+ final KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) builder.table(topic2, consumed);
doTestValueGetter(builder, topic1, table1, table2, table3, table4);
}
@Test
public void testNotSendingOldValue() {
- StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
- String topic1 = "topic1";
+ final String topic1 = "topic1";
- KTableImpl<String, String, String> table1 =
+ final KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(topic1, consumed);
- KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+ final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
public Integer apply(String value) {
@@ -320,13 +320,13 @@ public class KTableMapValuesTest {
@Test
public void testSendingOldValue() {
- StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
- String topic1 = "topic1";
+ final String topic1 = "topic1";
- KTableImpl<String, String, String> table1 =
+ final KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(topic1, consumed);
- KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+ final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
public Integer apply(String value) {
@@ -336,7 +336,7 @@ public class KTableMapValuesTest {
table2.enableSendingOldValues();
- MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
builder.build().addProcessor("proc", supplier, table2.name);
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.