You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/04/26 22:15:32 UTC
[kafka] branch 2.1 updated: KAFKA-8254: Pass Changelog as Topic in
Suppress Serdes (#6602) (#6641)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new bc51d80 KAFKA-8254: Pass Changelog as Topic in Suppress Serdes (#6602) (#6641)
bc51d80 is described below
commit bc51d803478e90af62d242ad8cc6cdaf348d3c17
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Apr 26 17:15:07 2019 -0500
KAFKA-8254: Pass Changelog as Topic in Suppress Serdes (#6602) (#6641)
Cherry-picked from #6602
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../streams/kstream/internals/FullChangeSerde.java | 8 +-
.../streams/kstream/internals/KStreamImpl.java | 36 +++-
.../streams/kstream/internals/KTableImpl.java | 9 +-
.../suppress/KTableSuppressProcessor.java | 43 ++---
.../processor/internals/ProcessorContextImpl.java | 4 +-
.../internals/ProcessorRecordContext.java | 23 +--
.../InMemoryTimeOrderedKeyValueBuffer.java | 62 ++++--
.../state/internals/TimeOrderedKeyValueBuffer.java | 57 +++++-
.../integration/SuppressionIntegrationTest.java | 165 ++++++++++++++--
.../kstream/internals/FullChangeSerdeTest.java | 20 +-
.../suppress/KTableSuppressProcessorTest.java | 49 +++--
.../internals/AbstractProcessorContextTest.java | 4 +-
.../InMemoryTimeOrderedKeyValueBufferTest.java | 4 +-
.../internals/TimeOrderedKeyValueBufferTest.java | 208 +++++++++------------
14 files changed, 448 insertions(+), 244 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
index 9bb8373..f06a428 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -25,21 +25,21 @@ import java.util.Map;
import static java.util.Objects.requireNonNull;
-public class FullChangeSerde<T> implements Serde<Change<T>> {
+public final class FullChangeSerde<T> implements Serde<Change<T>> {
private final Serde<T> inner;
@SuppressWarnings("unchecked")
- public static <T> FullChangeSerde<T> castOrWrap(final Serde<?> serde) {
+ public static <T> FullChangeSerde<T> castOrWrap(final Serde<T> serde) {
if (serde == null) {
return null;
} else if (serde instanceof FullChangeSerde) {
return (FullChangeSerde<T>) serde;
} else {
- return new FullChangeSerde<T>((Serde<T>) serde);
+ return new FullChangeSerde<>(serde);
}
}
- public FullChangeSerde(final Serde<T> inner) {
+ private FullChangeSerde(final Serde<T> inner) {
this.inner = requireNonNull(inner);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 26ea63c..88529fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -392,18 +392,26 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
public KStream<K, V> through(final String topic) {
- return through(topic, Produced.with(null, null, null));
+ return through(topic, Produced.with(keySerde, valSerde, null));
}
@Override
public KStream<K, V> through(final String topic, final Produced<K, V> produced) {
+ Objects.requireNonNull(topic, "topic can't be null");
+ Objects.requireNonNull(produced, "Produced can't be null");
final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
+ if (producedInternal.keySerde() == null) {
+ producedInternal.withKeySerde(keySerde);
+ }
+ if (producedInternal.valueSerde() == null) {
+ producedInternal.withValueSerde(valSerde);
+ }
to(topic, producedInternal);
return builder.stream(
Collections.singleton(topic),
new ConsumedInternal<>(
- producedInternal.keySerde() != null ? producedInternal.keySerde() : keySerde,
- producedInternal.valueSerde() != null ? producedInternal.valueSerde() : valSerde,
+ producedInternal.keySerde(),
+ producedInternal.valueSerde(),
new FailOnInvalidTimestamp(),
null
)
@@ -412,26 +420,40 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
public void to(final String topic) {
- to(topic, Produced.with(null, null, null));
+ to(topic, Produced.with(keySerde, valSerde, null));
}
@Override
public void to(final String topic, final Produced<K, V> produced) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(produced, "Produced can't be null");
- to(new StaticTopicNameExtractor<>(topic), new ProducedInternal<>(produced));
+ final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
+ if (producedInternal.keySerde() == null) {
+ producedInternal.withKeySerde(keySerde);
+ }
+ if (producedInternal.valueSerde() == null) {
+ producedInternal.withValueSerde(valSerde);
+ }
+ to(new StaticTopicNameExtractor<>(topic), producedInternal);
}
@Override
public void to(final TopicNameExtractor<K, V> topicExtractor) {
- to(topicExtractor, Produced.with(null, null, null));
+ to(topicExtractor, Produced.with(keySerde, valSerde, null));
}
@Override
public void to(final TopicNameExtractor<K, V> topicExtractor, final Produced<K, V> produced) {
Objects.requireNonNull(topicExtractor, "topic extractor can't be null");
Objects.requireNonNull(produced, "Produced can't be null");
- to(topicExtractor, new ProducedInternal<>(produced));
+ final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
+ if (producedInternal.keySerde() == null) {
+ producedInternal.withKeySerde(keySerde);
+ }
+ if (producedInternal.valueSerde() == null) {
+ producedInternal.withValueSerde(valSerde);
+ }
+ to(topicExtractor, producedInternal);
}
private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInternal<K, V> produced) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index f49d109..25e76f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -366,18 +366,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME);
final ProcessorSupplier<K, Change<V>> suppressionSupplier =
- () -> new KTableSuppressProcessor<>(
- suppressedInternal,
- storeName,
- keySerde,
- valSerde == null ? null : new FullChangeSerde<>(valSerde)
- );
+ () -> new KTableSuppressProcessor<>(suppressedInternal, storeName);
final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(suppressionSupplier, name),
- new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName),
+ new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valSerde)),
false
);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index 622223c..fcbc60b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -18,8 +18,6 @@ package org.apache.kafka.streams.kstream.internals.suppress;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
@@ -28,7 +26,6 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.state.internals.ContextualRecord;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import static java.util.Objects.requireNonNull;
@@ -42,22 +39,14 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
private final boolean safeToDropTombstones;
private final String storeName;
- private TimeOrderedKeyValueBuffer buffer;
+ private TimeOrderedKeyValueBuffer<K, Change<V>> buffer;
private InternalProcessorContext internalProcessorContext;
- private Serde<K> keySerde;
- private FullChangeSerde<V> valueSerde;
-
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
- public KTableSuppressProcessor(final SuppressedInternal<K> suppress,
- final String storeName,
- final Serde<K> keySerde,
- final FullChangeSerde<V> valueSerde) {
+ public KTableSuppressProcessor(final SuppressedInternal<K> suppress, final String storeName) {
this.storeName = storeName;
requireNonNull(suppress);
- this.keySerde = keySerde;
- this.valueSerde = valueSerde;
maxRecords = suppress.bufferConfig().maxRecords();
maxBytes = suppress.bufferConfig().maxBytes();
suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis();
@@ -70,9 +59,9 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
@Override
public void init(final ProcessorContext context) {
internalProcessorContext = (InternalProcessorContext) context;
- keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde;
- valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde;
- buffer = requireNonNull((TimeOrderedKeyValueBuffer) context.getStateStore(storeName));
+
+ buffer = requireNonNull((TimeOrderedKeyValueBuffer<K, Change<V>>) context.getStateStore(storeName));
+ buffer.setSerdesIfNull((Serde<K>) context.keySerde(), FullChangeSerde.castOrWrap((Serde<V>) context.valueSerde()));
}
@Override
@@ -84,12 +73,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
private void buffer(final K key, final Change<V> value) {
final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, key);
- final ProcessorRecordContext recordContext = internalProcessorContext.recordContext();
-
- final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(null, key));
- final byte[] serializedValue = valueSerde.serializer().serialize(null, value);
-
- buffer.put(bufferTime, serializedKey, new ContextualRecord(serializedValue, recordContext));
+ buffer.put(bufferTime, key, value, internalProcessorContext.recordContext());
}
private void enforceConstraints() {
@@ -110,6 +94,11 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
buffer.numRecords(), maxRecords,
buffer.bufferSize(), maxBytes
));
+ default:
+ throw new UnsupportedOperationException(
+ "The bufferFullStrategy [" + bufferFullStrategy +
+ "] is not implemented. This is a bug in Kafka Streams."
+ );
}
}
}
@@ -118,14 +107,12 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
return buffer.numRecords() > maxRecords || buffer.bufferSize() > maxBytes;
}
- private void emit(final KeyValue<Bytes, ContextualRecord> toEmit) {
- final Change<V> value = valueSerde.deserializer().deserialize(null, toEmit.value.value());
- if (shouldForward(value)) {
+ private void emit(final TimeOrderedKeyValueBuffer.Eviction<K, Change<V>> toEmit) {
+ if (shouldForward(toEmit.value())) {
final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext();
- internalProcessorContext.setRecordContext(toEmit.value.recordContext());
+ internalProcessorContext.setRecordContext(toEmit.recordContext());
try {
- final K key = keySerde.deserializer().deserialize(null, toEmit.key.get());
- internalProcessorContext.forward(key, value);
+ internalProcessorContext.forward(toEmit.key(), toEmit.value());
} finally {
internalProcessorContext.setRecordContext(prevRecordContext);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index fa78d01..c9f2431 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -110,7 +110,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
final V value,
final To to) {
final ProcessorNode previousNode = currentNode();
- final long currentTimestamp = recordContext.timestamp;
+ final long currentTimestamp = recordContext.timestamp();
try {
toInternal.update(to);
@@ -138,7 +138,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
}
} finally {
- recordContext.timestamp = currentTimestamp;
+ recordContext.setTimestamp(currentTimestamp);
setCurrentNode(previousNode);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 4f991a2..d4e1594 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -29,11 +29,11 @@ import static java.nio.charset.StandardCharsets.UTF_8;
public class ProcessorRecordContext implements RecordContext {
- long timestamp;
- final long offset;
- final String topic;
- final int partition;
- final Headers headers;
+ private long timestamp;
+ private final long offset;
+ private final String topic;
+ private final int partition;
+ private final Headers headers;
public ProcessorRecordContext(final long timestamp,
final long offset,
@@ -48,13 +48,6 @@ public class ProcessorRecordContext implements RecordContext {
this.headers = headers;
}
- public ProcessorRecordContext(final long timestamp,
- final long offset,
- final int partition,
- final String topic) {
- this(timestamp, offset, partition, topic, null);
- }
-
public void setTimestamp(final long timestamp) {
this.timestamp = timestamp;
}
@@ -221,9 +214,13 @@ public class ProcessorRecordContext implements RecordContext {
Objects.equals(headers, that.headers);
}
+ /**
+ * Equality is implemented in support of tests, *not* for use in Hash collections, since this class is mutable.
+ */
+ @Deprecated
@Override
public int hashCode() {
- return Objects.hash(timestamp, offset, topic, partition, headers);
+ throw new UnsupportedOperationException("ProcessorRecordContext is unsafe for use in Hash collections");
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index d323d97..82d07a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -22,8 +22,8 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -47,7 +47,7 @@ import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
-public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer {
+public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V> {
private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
private static final RecordHeaders V_1_CHANGELOG_HEADERS =
@@ -60,6 +60,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
private final String storeName;
private final boolean loggingEnabled;
+ private Serde<K> keySerde;
+ private Serde<V> valueSerde;
+
private long memBufferSize = 0L;
private long minTimestamp = Long.MAX_VALUE;
private RecordCollector collector;
@@ -69,13 +72,17 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
private int partition;
- public static class Builder implements StoreBuilder<StateStore> {
+ public static class Builder<K, V> implements StoreBuilder<StateStore> {
private final String storeName;
+ private final Serde<K> keySerde;
+ private final Serde<V> valSerde;
private boolean loggingEnabled = true;
- public Builder(final String storeName) {
+ public Builder(final String storeName, final Serde<K> keySerde, final Serde<V> valSerde) {
this.storeName = storeName;
+ this.keySerde = keySerde;
+ this.valSerde = valSerde;
}
/**
@@ -114,8 +121,8 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
}
@Override
- public StateStore build() {
- return new InMemoryTimeOrderedKeyValueBuffer(storeName, loggingEnabled);
+ public InMemoryTimeOrderedKeyValueBuffer<K, V> build() {
+ return new InMemoryTimeOrderedKeyValueBuffer<>(storeName, loggingEnabled, keySerde, valSerde);
}
@Override
@@ -145,8 +152,12 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final BufferKey bufferKey = (BufferKey) o;
return time == bufferKey.time &&
Objects.equals(key, bufferKey.key);
@@ -173,9 +184,14 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
}
}
- private InMemoryTimeOrderedKeyValueBuffer(final String storeName, final boolean loggingEnabled) {
+ private InMemoryTimeOrderedKeyValueBuffer(final String storeName,
+ final boolean loggingEnabled,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
this.storeName = storeName;
this.loggingEnabled = loggingEnabled;
+ this.keySerde = keySerde;
+ this.valueSerde = valueSerde;
}
@Override
@@ -190,6 +206,12 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
}
@Override
+ public void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> valueSerde) {
+ this.keySerde = this.keySerde == null ? keySerde : this.keySerde;
+ this.valueSerde = this.valueSerde == null ? valueSerde : this.valueSerde;
+ }
+
+ @Override
public void init(final ProcessorContext context, final StateStore root) {
context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
if (loggingEnabled) {
@@ -343,7 +365,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
@Override
public void evictWhile(final Supplier<Boolean> predicate,
- final Consumer<KeyValue<Bytes, ContextualRecord>> callback) {
+ final Consumer<Eviction<K, V>> callback) {
final Iterator<Map.Entry<BufferKey, ContextualRecord>> delegate = sortedMap.entrySet().iterator();
if (predicate.get()) {
@@ -360,7 +382,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
next.getKey().time + "]"
);
}
- callback.accept(new KeyValue<>(next.getKey().key, next.getValue()));
+ final K key = keySerde.deserializer().deserialize(changelogTopic, next.getKey().key.get());
+ final V value = valueSerde.deserializer().deserialize(changelogTopic, next.getValue().value());
+ callback.accept(new Eviction<>(key, value, next.getValue().recordContext()));
delegate.remove();
index.remove(next.getKey().key);
@@ -383,13 +407,17 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
@Override
public void put(final long time,
- final Bytes key,
- final ContextualRecord contextualRecord) {
- requireNonNull(contextualRecord.value(), "value cannot be null");
- requireNonNull(contextualRecord.recordContext(), "recordContext cannot be null");
+ final K key,
+ final V value,
+ final ProcessorRecordContext recordContext) {
+ requireNonNull(value, "value cannot be null");
+ requireNonNull(recordContext, "recordContext cannot be null");
+
+ final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key));
+ final byte[] serializedValue = valueSerde.serializer().serialize(changelogTopic, value);
- cleanPut(time, key, contextualRecord);
- dirtyKeys.add(key);
+ cleanPut(time, serializedKey, new ContextualRecord(serializedValue, recordContext));
+ dirtyKeys.add(serializedKey);
}
private void cleanPut(final long time, final Bytes key, final ContextualRecord value) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
index 86a8c1e..ffa1f49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
@@ -16,17 +16,64 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
-public interface TimeOrderedKeyValueBuffer extends StateStore {
- void evictWhile(final Supplier<Boolean> predicate, final Consumer<KeyValue<Bytes, ContextualRecord>> callback);
+public interface TimeOrderedKeyValueBuffer<K, V> extends StateStore {
+ final class Eviction<K, V> {
+ private final K key;
+ private final V value;
+ private final ProcessorRecordContext recordContext;
- void put(final long time, final Bytes key, final ContextualRecord value);
+ Eviction(final K key, final V value, final ProcessorRecordContext recordContext) {
+ this.key = key;
+ this.value = value;
+ this.recordContext = recordContext;
+ }
+
+ public K key() {
+ return key;
+ }
+
+ public V value() {
+ return value;
+ }
+
+ public ProcessorRecordContext recordContext() {
+ return recordContext;
+ }
+
+ @Override
+ public String toString() {
+ return "Eviction{key=" + key + ", value=" + value + ", recordContext=" + recordContext + '}';
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ final Eviction<?, ?> eviction = (Eviction<?, ?>) o;
+ return Objects.equals(key, eviction.key) &&
+ Objects.equals(value, eviction.value) &&
+ Objects.equals(recordContext, eviction.recordContext);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, value, recordContext);
+ }
+ }
+
+ void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> valueSerde);
+
+ void evictWhile(final Supplier<Boolean> predicate, final Consumer<Eviction<K, V>> callback);
+
+ void put(long time, K key, V value, ProcessorRecordContext recordContext);
int numRecords();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index da91b91..79957d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -17,9 +17,13 @@
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
@@ -35,6 +39,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
@@ -44,6 +49,8 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.Matchers;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -53,6 +60,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
+import java.util.stream.Collectors;
import static java.lang.Long.MAX_VALUE;
import static java.time.Duration.ofMillis;
@@ -62,6 +70,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
@@ -73,7 +82,7 @@ import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-@Category({IntegrationTest.class})
+@Category(IntegrationTest.class)
public class SuppressionIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
@@ -147,7 +156,7 @@ public class SuppressionIntegrationTest {
}
}
- private KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) {
+ private static KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) {
return builder
.table(
input,
@@ -161,6 +170,139 @@ public class SuppressionIntegrationTest {
}
@Test
+ public void shouldUseDefaultSerdes() {
+ final String testId = "-shouldInheritSerdes";
+ final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+ final String input = "input" + testId;
+ final String outputSuppressed = "output-suppressed" + testId;
+ final String outputRaw = "output-raw" + testId;
+
+ cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final KStream<String, String> inputStream = builder.stream(input);
+
+ final KTable<String, String> valueCounts = inputStream
+ .groupByKey()
+ .aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")");
+
+ valueCounts
+ .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).emitEarlyWhenFull()))
+ .toStream()
+ .to(outputSuppressed);
+
+ valueCounts
+ .toStream()
+ .to(outputRaw);
+
+ final Properties streamsConfig = getStreamsConfig(appId);
+ streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+ streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+
+ final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
+ try {
+ produceSynchronously(
+ input,
+ asList(
+ new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+ new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
+ new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
+ new KeyValueTimestamp<>("x", "x", scaledTime(3L))
+ )
+ );
+ final boolean rawRecords = waitForAnyRecord(outputRaw);
+ final boolean suppressedRecords = waitForAnyRecord(outputSuppressed);
+ assertThat(rawRecords, Matchers.is(true));
+ assertThat(suppressedRecords, is(true));
+ } finally {
+ driver.close();
+ cleanStateAfterTest(CLUSTER, driver);
+ }
+ }
+
+ @Test
+ public void shouldInheritSerdes() {
+ final String testId = "-shouldInheritSerdes";
+ final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+ final String input = "input" + testId;
+ final String outputSuppressed = "output-suppressed" + testId;
+ final String outputRaw = "output-raw" + testId;
+
+ cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final KStream<String, String> inputStream = builder.stream(input);
+
+ // count sets the serde to Long
+ final KTable<String, Long> valueCounts = inputStream
+ .groupByKey()
+ .count();
+
+ valueCounts
+ .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).emitEarlyWhenFull()))
+ .toStream()
+ .to(outputSuppressed);
+
+ valueCounts
+ .toStream()
+ .to(outputRaw);
+
+ final Properties streamsConfig = getStreamsConfig(appId);
+ streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+ streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+
+ final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
+ try {
+ produceSynchronously(
+ input,
+ asList(
+ new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+ new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
+ new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
+ new KeyValueTimestamp<>("x", "x", scaledTime(3L))
+ )
+ );
+ final boolean rawRecords = waitForAnyRecord(outputRaw);
+ final boolean suppressedRecords = waitForAnyRecord(outputSuppressed);
+ assertThat(rawRecords, Matchers.is(true));
+ assertThat(suppressedRecords, is(true));
+ } finally {
+ driver.close();
+ cleanStateAfterTest(CLUSTER, driver);
+ }
+ }
+
+ private static boolean waitForAnyRecord(final String topic) {
+ final Properties properties = new Properties();
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+ try (final Consumer<Object, Object> consumer = new KafkaConsumer<>(properties)) {
+ final List<TopicPartition> partitions =
+ consumer.partitionsFor(topic)
+ .stream()
+ .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
+ .collect(Collectors.toList());
+ consumer.assign(partitions);
+ consumer.seekToBeginning(partitions);
+ final long start = System.currentTimeMillis();
+ while ((System.currentTimeMillis() - start) < DEFAULT_TIMEOUT) {
+ final ConsumerRecords<Object, Object> records = consumer.poll(ofMillis(500));
+
+ if (!records.isEmpty()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+ }
+
+ @Test
public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() {
final String testId = "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
@@ -336,7 +478,7 @@ public class SuppressionIntegrationTest {
valueCounts
// this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size.
- .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), maxBytes(200L).emitEarlyWhenFull()))
+ .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxBytes(200L).emitEarlyWhenFull()))
.toStream()
.to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
@@ -396,7 +538,7 @@ public class SuppressionIntegrationTest {
valueCounts
// this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size.
- .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), maxBytes(200L).shutDownWhenFull()))
+ .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxBytes(200L).shutDownWhenFull()))
.toStream()
.to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
@@ -490,17 +632,18 @@ public class SuppressionIntegrationTest {
}
}
- private Properties getStreamsConfig(final String appId) {
+ private static Properties getStreamsConfig(final String appId) {
return mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)),
- mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE)
+ mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE),
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
));
}
- private String scaledWindowKey(final String key, final long unscaledStart, final long unscaledEnd) {
+ private static String scaledWindowKey(final String key, final long unscaledStart, final long unscaledEnd) {
return new Windowed<>(key, new TimeWindow(scaledTime(unscaledStart), scaledTime(unscaledEnd))).toString();
}
@@ -508,11 +651,11 @@ public class SuppressionIntegrationTest {
* scaling to ensure that there are commits in between the various test events,
* just to exercise that everything works properly in the presence of commits.
*/
- private long scaledTime(final long unscaledTime) {
+ private static long scaledTime(final long unscaledTime) {
return COMMIT_INTERVAL * 2 * unscaledTime;
}
- private void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
+ private static void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
final Properties producerConfig = mkProperties(mkMap(
mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"),
mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
@@ -522,12 +665,12 @@ public class SuppressionIntegrationTest {
IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, Optional.empty(), toProduce);
}
- private void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException {
+ private static void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException {
waitForCondition(() -> !driver.state().isRunning(), TIMEOUT_MS, "Streams didn't shut down.");
assertThat(driver.state(), is(KafkaStreams.State.ERROR));
}
- private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
+ private static void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
final Properties properties = mkProperties(
mkMap(
mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
index a6a8888..ddba05e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
@@ -29,7 +29,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
public class FullChangeSerdeTest {
- private final FullChangeSerde<String> serde = new FullChangeSerde<>(Serdes.String());
+ private final FullChangeSerde<String> serde = FullChangeSerde.castOrWrap(Serdes.String());
@Test
public void shouldRoundTripNull() {
@@ -77,31 +77,28 @@ public class FullChangeSerdeTest {
);
}
- @SuppressWarnings("unchecked")
@Test
public void shouldConfigureSerde() {
final Serde<Void> mock = EasyMock.mock(Serde.class);
mock.configure(emptyMap(), false);
EasyMock.expectLastCall();
EasyMock.replay(mock);
- final FullChangeSerde<Void> serde = new FullChangeSerde<>(mock);
+ final FullChangeSerde<Void> serde = FullChangeSerde.castOrWrap(mock);
serde.configure(emptyMap(), false);
EasyMock.verify(mock);
}
- @SuppressWarnings("unchecked")
@Test
public void shouldCloseSerde() {
final Serde<Void> mock = EasyMock.mock(Serde.class);
mock.close();
EasyMock.expectLastCall();
EasyMock.replay(mock);
- final FullChangeSerde<Void> serde = new FullChangeSerde<>(mock);
+ final FullChangeSerde<Void> serde = FullChangeSerde.castOrWrap(mock);
serde.close();
EasyMock.verify(mock);
}
- @SuppressWarnings("unchecked")
@Test
public void shouldConfigureSerializer() {
final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
@@ -111,13 +108,12 @@ public class FullChangeSerdeTest {
mockSerializer.configure(emptyMap(), false);
EasyMock.expectLastCall();
EasyMock.replay(mockSerializer);
- final Serializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).serializer();
+ final Serializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer();
serializer.configure(emptyMap(), false);
EasyMock.verify(mockSerde);
EasyMock.verify(mockSerializer);
}
- @SuppressWarnings("unchecked")
@Test
public void shouldCloseSerializer() {
final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
@@ -127,13 +123,12 @@ public class FullChangeSerdeTest {
mockSerializer.close();
EasyMock.expectLastCall();
EasyMock.replay(mockSerializer);
- final Serializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).serializer();
+ final Serializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer();
serializer.close();
EasyMock.verify(mockSerde);
EasyMock.verify(mockSerializer);
}
- @SuppressWarnings("unchecked")
@Test
public void shouldConfigureDeserializer() {
final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
@@ -143,13 +138,12 @@ public class FullChangeSerdeTest {
mockDeserializer.configure(emptyMap(), false);
EasyMock.expectLastCall();
EasyMock.replay(mockDeserializer);
- final Deserializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).deserializer();
+ final Deserializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer();
serializer.configure(emptyMap(), false);
EasyMock.verify(mockSerde);
EasyMock.verify(mockDeserializer);
}
- @SuppressWarnings("unchecked")
@Test
public void shouldCloseDeserializer() {
final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
@@ -159,7 +153,7 @@ public class FullChangeSerdeTest {
mockDeserializer.close();
EasyMock.expectLastCall();
EasyMock.replay(mockDeserializer);
- final Deserializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).deserializer();
+ final Deserializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer();
serializer.close();
EasyMock.verify(mockSerde);
EasyMock.verify(mockDeserializer);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index bb1bc0f..6246459 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -72,11 +72,12 @@ public class KTableSuppressProcessorTest {
final String storeName = "test-store";
- final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName)
+ final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valueSerde))
.withLoggingDisabled()
.build();
+
final KTableSuppressProcessor<K, V> processor =
- new KTableSuppressProcessor<>(getImpl(suppressed), storeName, keySerde, new FullChangeSerde<>(valueSerde));
+ new KTableSuppressProcessor<>((SuppressedInternal<K>) suppressed, storeName);
final MockInternalProcessorContext context = new MockInternalProcessorContext();
buffer.init(context, buffer);
@@ -201,7 +202,6 @@ public class KTableSuppressProcessorTest {
// note the record is in the past, but the window end is in the future, so we still have to buffer,
// even though the grace period is 0.
final long timestamp = 5L;
- final long streamTime = 99L;
final long windowEnd = 100L;
context.setRecordMetadata("", 0, 0L, null, timestamp);
final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, windowEnd));
@@ -237,8 +237,12 @@ public class KTableSuppressProcessorTest {
assertThat(capturedForward.timestamp(), is(timestamp));
}
+ /**
+ * It's desirable to drop tombstones for final-results windowed streams, since (as described in the
+ * {@link SuppressedInternal} javadoc), they are unnecessary to emit.
+ */
@Test
- public void finalResultsShouldSuppressTombstonesForTimeWindows() {
+ public void finalResultsShouldDropTombstonesForTimeWindows() {
final Harness<Windowed<String>, Long> harness =
new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long());
final MockInternalProcessorContext context = harness.context;
@@ -253,8 +257,13 @@ public class KTableSuppressProcessorTest {
assertThat(context.forwarded(), hasSize(0));
}
+
+ /**
+ * It's desirable to drop tombstones for final-results windowed streams, since (as described in the
+ * {@link SuppressedInternal} javadoc), they are unnecessary to emit.
+ */
@Test
- public void finalResultsShouldSuppressTombstonesForSessionWindows() {
+ public void finalResultsShouldDropTombstonesForSessionWindows() {
final Harness<Windowed<String>, Long> harness =
new Harness<>(finalResults(ofMillis(0L)), sessionWindowedSerdeFrom(String.class), Long());
final MockInternalProcessorContext context = harness.context;
@@ -269,8 +278,12 @@ public class KTableSuppressProcessorTest {
assertThat(context.forwarded(), hasSize(0));
}
+ /**
+ * It's NOT OK to drop tombstones for non-final-results windowed streams, since we may have emitted some results for
+ * the window before getting the tombstone (see the {@link SuppressedInternal} javadoc).
+ */
@Test
- public void suppressShouldNotSuppressTombstonesForTimeWindows() {
+ public void suppressShouldNotDropTombstonesForTimeWindows() {
final Harness<Windowed<String>, Long> harness =
new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), timeWindowedSerdeFrom(String.class, 100L), Long());
final MockInternalProcessorContext context = harness.context;
@@ -288,8 +301,13 @@ public class KTableSuppressProcessorTest {
assertThat(capturedForward.timestamp(), is(timestamp));
}
+
+ /**
+ * It's NOT OK to drop tombstones for non-final-results windowed streams, since we may have emitted some results for
+ * the window before getting the tombstone (see the {@link SuppressedInternal} javadoc).
+ */
@Test
- public void suppressShouldNotSuppressTombstonesForSessionWindows() {
+ public void suppressShouldNotDropTombstonesForSessionWindows() {
final Harness<Windowed<String>, Long> harness =
new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), sessionWindowedSerdeFrom(String.class), Long());
final MockInternalProcessorContext context = harness.context;
@@ -307,8 +325,13 @@ public class KTableSuppressProcessorTest {
assertThat(capturedForward.timestamp(), is(timestamp));
}
+
+ /**
+ * It's SUPER NOT OK to drop tombstones for non-windowed streams, since we may have emitted some results for
+ * the key before getting the tombstone (see the {@link SuppressedInternal} javadoc).
+ */
@Test
- public void suppressShouldNotSuppressTombstonesForKTable() {
+ public void suppressShouldNotDropTombstonesForKTable() {
final Harness<String, Long> harness =
new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), String(), Long());
final MockInternalProcessorContext context = harness.context;
@@ -416,8 +439,8 @@ public class KTableSuppressProcessorTest {
}
}
- @SuppressWarnings("unchecked")
- private <K extends Windowed> SuppressedInternal<K> finalResults(final Duration grace) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private static <K extends Windowed> SuppressedInternal<K> finalResults(final Duration grace) {
return ((FinalResultsSuppressionBuilder) untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace);
}
@@ -441,11 +464,7 @@ public class KTableSuppressProcessorTest {
};
}
- private static <K> SuppressedInternal<K> getImpl(final Suppressed<K> suppressed) {
- return (SuppressedInternal<K>) suppressed;
- }
-
- private <K> Serde<Windowed<K>> timeWindowedSerdeFrom(final Class<K> rawType, final long windowSize) {
+ private static <K> Serde<Windowed<K>> timeWindowedSerdeFrom(final Class<K> rawType, final long windowSize) {
final Serde<K> kSerde = Serdes.serdeFrom(rawType);
return new Serdes.WrapperSerde<>(
new TimeWindowedSerializer<>(kSerde.serializer()),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 43df1d2..1864547 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -96,7 +96,7 @@ public class AbstractProcessorContextTest {
@Test
public void shouldReturnNullIfTopicEqualsNonExistTopic() {
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null));
assertThat(context.topic(), nullValue());
}
@@ -154,7 +154,7 @@ public class AbstractProcessorContextTest {
@Test
public void shouldReturnNullIfHeadersAreNotSet() {
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null));
assertThat(context.headers(), nullValue());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java
index ddc4046..18f689f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java
@@ -22,11 +22,11 @@ public class InMemoryTimeOrderedKeyValueBufferTest {
@Test
public void bufferShouldAllowCacheEnablement() {
- new InMemoryTimeOrderedKeyValueBuffer.Builder(null).withCachingEnabled();
+ new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingEnabled();
}
@Test
public void bufferShouldAllowCacheDisablement() {
- new InMemoryTimeOrderedKeyValueBuffer.Builder(null).withCachingDisabled();
+ new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingDisabled();
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 2953953..6ae36d4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -22,13 +22,14 @@ import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer.Eviction;
import org.apache.kafka.test.MockInternalProcessorContext;
import org.apache.kafka.test.MockInternalProcessorContext.MockRecordCollector;
import org.apache.kafka.test.TestUtils;
@@ -55,7 +56,7 @@ import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
@RunWith(Parameterized.class)
-public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> {
+public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<String, String>> {
private static final RecordHeaders V_1_CHANGELOG_HEADERS =
new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
@@ -69,9 +70,9 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
return singletonList(
new Object[] {
"in-memory buffer",
- (Function<String, InMemoryTimeOrderedKeyValueBuffer>) name ->
- (InMemoryTimeOrderedKeyValueBuffer) new InMemoryTimeOrderedKeyValueBuffer
- .Builder(name)
+ (Function<String, InMemoryTimeOrderedKeyValueBuffer<String, String>>) name ->
+ new InMemoryTimeOrderedKeyValueBuffer
+ .Builder<>(name, Serdes.String(), Serdes.String())
.build()
}
);
@@ -96,7 +97,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
}
- private static void cleanup(final MockInternalProcessorContext context, final TimeOrderedKeyValueBuffer buffer) {
+ private static void cleanup(final MockInternalProcessorContext context, final TimeOrderedKeyValueBuffer<String, String> buffer) {
try {
buffer.close();
Utils.delete(context.stateDir());
@@ -107,7 +108,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
@Test
public void shouldInit() {
- final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
cleanup(context, buffer);
@@ -115,23 +116,23 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
@Test
public void shouldAcceptData() {
- final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
- putRecord(buffer, context, "2p93nf", 0, "asdf");
+ putRecord(buffer, context, 0L, 0L, "asdf", "2p93nf");
cleanup(context, buffer);
}
@Test
public void shouldRejectNullValues() {
- final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
try {
- buffer.put(0, getBytes("asdf"), new ContextualRecord(
- null,
- new ProcessorRecordContext(0, 0, 0, "topic")
- ));
+ buffer.put(0, "asdf",
+ null,
+ getContext(0)
+ );
fail("expected an exception");
} catch (final NullPointerException expected) {
// expected
@@ -139,27 +140,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
cleanup(context, buffer);
}
- private static ContextualRecord getRecord(final String value) {
- return getRecord(value, 0L);
- }
-
- private static ContextualRecord getRecord(final String value, final long timestamp) {
- return new ContextualRecord(
- value.getBytes(UTF_8),
- new ProcessorRecordContext(timestamp, 0, 0, "topic")
- );
- }
-
- private static Bytes getBytes(final String key) {
- return Bytes.wrap(key.getBytes(UTF_8));
- }
-
@Test
public void shouldRemoveData() {
- final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
- putRecord(buffer, context, "qwer", 0, "asdf");
+ putRecord(buffer, context, 0L, 0L, "asdf", "qwer");
assertThat(buffer.numRecords(), is(1));
buffer.evictWhile(() -> true, kv -> { });
assertThat(buffer.numRecords(), is(0));
@@ -168,90 +154,71 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
@Test
public void shouldRespectEvictionPredicate() {
- final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
- final Bytes firstKey = getBytes("asdf");
- final ContextualRecord firstRecord = getRecord("eyt");
- putRecord(0, buffer, context, firstRecord, firstKey);
- putRecord(buffer, context, "rtg", 1, "zxcv");
+ putRecord(buffer, context, 0L, 0L, "asdf", "eyt");
+ putRecord(buffer, context, 1L, 0L, "zxcv", "rtg");
assertThat(buffer.numRecords(), is(2));
- final List<KeyValue<Bytes, ContextualRecord>> evicted = new LinkedList<>();
+ final List<Eviction<String, String>> evicted = new LinkedList<>();
buffer.evictWhile(() -> buffer.numRecords() > 1, evicted::add);
assertThat(buffer.numRecords(), is(1));
- assertThat(evicted, is(singletonList(new KeyValue<>(firstKey, firstRecord))));
+ assertThat(evicted, is(singletonList(new Eviction<>("asdf", "eyt", getContext(0L)))));
cleanup(context, buffer);
}
@Test
public void shouldTrackCount() {
- final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
- putRecord(buffer, context, "oin", 0, "asdf");
+ putRecord(buffer, context, 0L, 0L, "asdf", "oin");
assertThat(buffer.numRecords(), is(1));
- putRecord(buffer, context, "wekjn", 1, "asdf");
+ putRecord(buffer, context, 1L, 0L, "asdf", "wekjn");
assertThat(buffer.numRecords(), is(1));
- putRecord(buffer, context, "24inf", 0, "zxcv");
+ putRecord(buffer, context, 0L, 0L, "zxcv", "24inf");
assertThat(buffer.numRecords(), is(2));
cleanup(context, buffer);
}
@Test
public void shouldTrackSize() {
- final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
- putRecord(buffer, context, "23roni", 0, "asdf");
+ putRecord(buffer, context, 0L, 0L, "asdf", "23roni");
assertThat(buffer.bufferSize(), is(43L));
- putRecord(buffer, context, "3l", 1, "asdf");
+ putRecord(buffer, context, 1L, 0L, "asdf", "3l");
assertThat(buffer.bufferSize(), is(39L));
- putRecord(buffer, context, "qfowin", 0, "zxcv");
+ putRecord(buffer, context, 0L, 0L, "zxcv", "qfowin");
assertThat(buffer.bufferSize(), is(82L));
cleanup(context, buffer);
}
@Test
public void shouldTrackMinTimestamp() {
- final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
- putRecord(buffer, context, "2093j", 1, "asdf");
+ putRecord(buffer, context, 1L, 0L, "asdf", "2093j");
assertThat(buffer.minTimestamp(), is(1L));
- putRecord(buffer, context, "3gon4i", 0, "zxcv");
+ putRecord(buffer, context, 0L, 0L, "zxcv", "3gon4i");
assertThat(buffer.minTimestamp(), is(0L));
cleanup(context, buffer);
}
- private static void putRecord(final TimeOrderedKeyValueBuffer buffer,
- final MockInternalProcessorContext context,
- final String value,
- final int time,
- final String key) {
- putRecord(time, buffer, context, getRecord(value), getBytes(key));
- }
-
- private static void putRecord(final int time,
- final TimeOrderedKeyValueBuffer buffer,
- final MockInternalProcessorContext context,
- final ContextualRecord firstRecord,
- final Bytes firstKey) {
- context.setRecordContext(firstRecord.recordContext());
- buffer.put(time, firstKey, firstRecord);
- }
-
@Test
public void shouldEvictOldestAndUpdateSizeAndCountAndMinTimestamp() {
- final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
- putRecord(buffer, context, "o23i4", 1, "zxcv");
+ putRecord(buffer, context, 1L, 0L, "zxcv", "o23i4");
assertThat(buffer.numRecords(), is(1));
assertThat(buffer.bufferSize(), is(42L));
assertThat(buffer.minTimestamp(), is(1L));
- putRecord(buffer, context, "3ng", 0, "asdf");
+ putRecord(buffer, context, 0L, 0L, "asdf", "3ng");
assertThat(buffer.numRecords(), is(2));
assertThat(buffer.bufferSize(), is(82L));
assertThat(buffer.minTimestamp(), is(0L));
@@ -260,14 +227,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
buffer.evictWhile(() -> true, kv -> {
switch (callbackCount.incrementAndGet()) {
case 1: {
- assertThat(new String(kv.key.get(), UTF_8), is("asdf"));
+ assertThat(kv.key(), is("asdf"));
assertThat(buffer.numRecords(), is(2));
assertThat(buffer.bufferSize(), is(82L));
assertThat(buffer.minTimestamp(), is(0L));
break;
}
case 2: {
- assertThat(new String(kv.key.get(), UTF_8), is("zxcv"));
+ assertThat(kv.key(), is("zxcv"));
assertThat(buffer.numRecords(), is(1));
assertThat(buffer.bufferSize(), is(42L));
assertThat(buffer.minTimestamp(), is(1L));
@@ -288,12 +255,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
@Test
public void shouldFlush() {
- final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
- putRecord(2, buffer, context, getRecord("2093j", 0L), getBytes("asdf"));
- putRecord(1, buffer, context, getRecord("3gon4i", 1L), getBytes("zxcv"));
- putRecord(0, buffer, context, getRecord("deadbeef", 2L), getBytes("deleteme"));
+ putRecord(buffer, context, 2L, 0L, "asdf", "2093j");
+ putRecord(buffer, context, 1L, 1L, "zxcv", "3gon4i");
+ putRecord(buffer, context, 0L, 2L, "deleteme", "deadbeef");
// replace "deleteme" with a tombstone
buffer.evictWhile(() -> buffer.minTimestamp() < 1, kv -> { });
@@ -357,17 +324,16 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
cleanup(context, buffer);
}
-
@Test
public void shouldRestoreOldFormat() {
- final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, ""));
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
stateRestoreCallback.restoreBatch(asList(
new ConsumerRecord<>("changelog-topic",
@@ -425,7 +391,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
// flush the buffer into a list in buffer order so we can make assertions about the contents.
- final List<KeyValue<Bytes, ContextualRecord>> evicted = new LinkedList<>();
+ final List<Eviction<String, String>> evicted = new LinkedList<>();
buffer.evictWhile(() -> true, evicted::add);
// Several things to note:
@@ -437,22 +403,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
// original format.
assertThat(evicted, is(asList(
- new KeyValue<>(
- getBytes("zxcv"),
- new ContextualRecord("3o4im".getBytes(UTF_8),
- new ProcessorRecordContext(2,
- 2,
- 0,
- "changelog-topic",
- new RecordHeaders()))),
- new KeyValue<>(
- getBytes("asdf"),
- new ContextualRecord("qwer".getBytes(UTF_8),
- new ProcessorRecordContext(1,
- 1,
- 0,
- "changelog-topic",
- new RecordHeaders())))
+ new Eviction<>(
+ "zxcv",
+ "3o4im",
+ new ProcessorRecordContext(2L, 2, 0, "changelog-topic", new RecordHeaders())),
+ new Eviction<>(
+ "asdf",
+ "qwer",
+ new ProcessorRecordContext(1L, 1, 0, "changelog-topic", new RecordHeaders()))
)));
cleanup(context, buffer);
@@ -460,14 +418,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
@Test
public void shouldRestoreNewFormat() {
- final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, ""));
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
@@ -533,7 +491,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
// flush the buffer into a list in buffer order so we can make assertions about the contents.
- final List<KeyValue<Bytes, ContextualRecord>> evicted = new LinkedList<>();
+ final List<Eviction<String, String>> evicted = new LinkedList<>();
buffer.evictWhile(() -> true, evicted::add);
// Several things to note:
@@ -541,41 +499,33 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
// * The record timestamps are properly restored, and not conflated with the record's buffer time.
// * The keys and values are properly restored
// * The record topic is set to the original input topic, *not* the changelog topic
- // * The record offset preserves the origininal input record's offset, *not* the offset of the changelog record
+ // * The record offset preserves the original input record's offset, *not* the offset of the changelog record
assertThat(evicted, is(asList(
- new KeyValue<>(
- getBytes("zxcv"),
- new ContextualRecord("3o4im".getBytes(UTF_8),
- new ProcessorRecordContext(2,
- 0,
- 0,
- "topic",
- null))),
- new KeyValue<>(
- getBytes("asdf"),
- new ContextualRecord("qwer".getBytes(UTF_8),
- new ProcessorRecordContext(1,
- 0,
- 0,
- "topic",
- null)))
- )));
+ new Eviction<>(
+ "zxcv",
+ "3o4im",
+ getContext(2L)),
+ new Eviction<>(
+ "asdf",
+ "qwer",
+ getContext(1L)
+ ))));
cleanup(context, buffer);
}
@Test
public void shouldNotRestoreUnrecognizedVersionRecord() {
- final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, ""));
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
final RecordHeaders unknownFlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) -1})});
@@ -601,4 +551,26 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
cleanup(context, buffer);
}
}
+
+ private static void putRecord(final TimeOrderedKeyValueBuffer<String, String> buffer,
+ final MockInternalProcessorContext context,
+ final long streamTime,
+ final long recordTimestamp,
+ final String key,
+ final String value) {
+ final ProcessorRecordContext recordContext = getContext(recordTimestamp);
+ context.setRecordContext(recordContext);
+ buffer.put(streamTime, key, value, recordContext);
+ }
+
+ private static ContextualRecord getRecord(final String value, final long timestamp) {
+ return new ContextualRecord(
+ value.getBytes(UTF_8),
+ getContext(timestamp)
+ );
+ }
+
+ private static ProcessorRecordContext getContext(final long recordTimestamp) {
+ return new ProcessorRecordContext(recordTimestamp, 0, 0, "topic", null);
+ }
}