You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/04/23 00:22:13 UTC
[kafka] branch 2.2 updated: KAFKA-7895: fix Suppress changelog
restore (#6536) (#6615)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new 6c74506 KAFKA-7895: fix Suppress changelog restore (#6536) (#6615)
6c74506 is described below
commit 6c74506728af0b03c1aae167f6957efe4b128173
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Mon Apr 22 19:21:56 2019 -0500
KAFKA-7895: fix Suppress changelog restore (#6536) (#6615)
Cherry-picked from #6536 / 6538e9e
Reviewers: Bill Bejeck <bb...@gmail.com>
---
.../kafka/clients/consumer/ConsumerRecord.java | 6 +-
.../suppress/KTableSuppressProcessor.java | 4 +-
.../internals/ProcessorRecordContext.java | 137 ++++-
.../streams/state/internals/ContextualRecord.java | 43 +-
.../InMemoryTimeOrderedKeyValueBuffer.java | 173 ++++--
.../apache/kafka/streams/KeyValueTimestamp.java | 17 +
.../SuppressionDurabilityIntegrationTest.java | 166 ++++--
.../integration/SuppressionIntegrationTest.java | 3 +-
.../integration/utils/IntegrationTestUtils.java | 60 +-
.../suppress/KTableSuppressProcessorTest.java | 1 -
.../internals/TimeOrderedKeyValueBufferTest.java | 604 +++++++++++++++++++++
.../kafka/test/MockInternalProcessorContext.java | 104 +++-
12 files changed, 1200 insertions(+), 118 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index 0413d5b..a7dad7b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -157,6 +157,8 @@ public class ConsumerRecord<K, V> {
Optional<Integer> leaderEpoch) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
+ if (headers == null)
+ throw new IllegalArgumentException("Headers cannot be null");
this.topic = topic;
this.partition = partition;
@@ -173,7 +175,7 @@ public class ConsumerRecord<K, V> {
}
/**
- * The topic this record is received from
+ * The topic this record is received from (never null)
*/
public String topic() {
return this.topic;
@@ -187,7 +189,7 @@ public class ConsumerRecord<K, V> {
}
/**
- * The headers
+ * The headers (never null)
*/
public Headers headers() {
return headers;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index 691e09e..686002a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -33,8 +33,6 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.internals.ContextualRecord;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
-import java.util.Objects;
-
import static java.util.Objects.requireNonNull;
public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
@@ -78,7 +76,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde;
valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde;
- buffer = Objects.requireNonNull((TimeOrderedKeyValueBuffer) context.getStateStore(storeName));
+ buffer = requireNonNull((TimeOrderedKeyValueBuffer) context.getStateStore(storeName));
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 0001274..aacb801 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -18,10 +18,15 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.processor.RecordContext;
+import java.nio.ByteBuffer;
import java.util.Objects;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
public class ProcessorRecordContext implements RecordContext {
long timestamp;
@@ -80,13 +85,13 @@ public class ProcessorRecordContext implements RecordContext {
}
public long sizeBytes() {
- long size = 0L;
- size += 8; // value.context.timestamp
- size += 8; // value.context.offset
+ long size = 0;
+ size += Long.BYTES; // value.context.timestamp
+ size += Long.BYTES; // value.context.offset
if (topic != null) {
size += topic.toCharArray().length;
}
- size += 4; // partition
+ size += Integer.BYTES; // partition
if (headers != null) {
for (final Header header : headers) {
size += header.key().toCharArray().length;
@@ -99,6 +104,111 @@ public class ProcessorRecordContext implements RecordContext {
return size;
}
+ public byte[] serialize() {
+ final byte[] topicBytes = topic.getBytes(UTF_8);
+ final byte[][] headerKeysBytes;
+ final byte[][] headerValuesBytes;
+
+
+ int size = 0;
+ size += Long.BYTES; // value.context.timestamp
+ size += Long.BYTES; // value.context.offset
+ size += Integer.BYTES; // size of topic
+ size += topicBytes.length;
+ size += Integer.BYTES; // partition
+ size += Integer.BYTES; // number of headers
+
+ if (headers == null) {
+ headerKeysBytes = headerValuesBytes = null;
+ } else {
+ final Header[] headers = this.headers.toArray();
+ headerKeysBytes = new byte[headers.length][];
+ headerValuesBytes = new byte[headers.length][];
+
+ for (int i = 0; i < headers.length; i++) {
+ size += 2 * Integer.BYTES; // sizes of key and value
+
+ final byte[] keyBytes = headers[i].key().getBytes(UTF_8);
+ size += keyBytes.length;
+ final byte[] valueBytes = headers[i].value();
+ if (valueBytes != null) {
+ size += valueBytes.length;
+ }
+
+ headerKeysBytes[i] = keyBytes;
+ headerValuesBytes[i] = valueBytes;
+ }
+ }
+
+ final ByteBuffer buffer = ByteBuffer.allocate(size);
+ buffer.putLong(timestamp);
+ buffer.putLong(offset);
+
+ // not handling the null condition because we believe topic will never be null in cases where we serialize
+ buffer.putInt(topicBytes.length);
+ buffer.put(topicBytes);
+
+ buffer.putInt(partition);
+ if (headers == null) {
+ buffer.putInt(-1);
+ } else {
+ buffer.putInt(headerKeysBytes.length);
+ for (int i = 0; i < headerKeysBytes.length; i++) {
+ buffer.putInt(headerKeysBytes[i].length);
+ buffer.put(headerKeysBytes[i]);
+
+ if (headerValuesBytes[i] != null) {
+ buffer.putInt(headerValuesBytes[i].length);
+ buffer.put(headerValuesBytes[i]);
+ } else {
+ buffer.putInt(-1);
+ }
+ }
+ }
+
+ return buffer.array();
+ }
+
+ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
+ final long timestamp = buffer.getLong();
+ final long offset = buffer.getLong();
+ final int topicSize = buffer.getInt();
+ final String topic;
+ {
+ // not handling the null topic condition, because we believe the topic will never be null when we serialize
+ final byte[] topicBytes = new byte[topicSize];
+ buffer.get(topicBytes);
+ topic = new String(topicBytes, UTF_8);
+ }
+ final int partition = buffer.getInt();
+ final int headerCount = buffer.getInt();
+ final Headers headers;
+ if (headerCount == -1) {
+ headers = null;
+ } else {
+ final Header[] headerArr = new Header[headerCount];
+ for (int i = 0; i < headerCount; i++) {
+ final int keySize = buffer.getInt();
+ final byte[] keyBytes = new byte[keySize];
+ buffer.get(keyBytes);
+
+ final int valueSize = buffer.getInt();
+ final byte[] valueBytes;
+ if (valueSize == -1) {
+ valueBytes = null;
+ } else {
+ valueBytes = new byte[valueSize];
+ buffer.get(valueBytes);
+ }
+
+ headerArr[i] = new RecordHeader(new String(keyBytes, UTF_8), valueBytes);
+ }
+ headers = new RecordHeaders(headerArr);
+ }
+
+ return new ProcessorRecordContext(timestamp, offset, partition, topic, headers);
+ }
+
@Override
public boolean equals(final Object o) {
if (this == o) {
@@ -109,14 +219,25 @@ public class ProcessorRecordContext implements RecordContext {
}
final ProcessorRecordContext that = (ProcessorRecordContext) o;
return timestamp == that.timestamp &&
- offset == that.offset &&
- partition == that.partition &&
- Objects.equals(topic, that.topic) &&
- Objects.equals(headers, that.headers);
+ offset == that.offset &&
+ partition == that.partition &&
+ Objects.equals(topic, that.topic) &&
+ Objects.equals(headers, that.headers);
}
@Override
public int hashCode() {
return Objects.hash(timestamp, offset, topic, partition, headers);
}
+
+ @Override
+ public String toString() {
+ return "ProcessorRecordContext{" +
+ "topic='" + topic + '\'' +
+ ", partition=" + partition +
+ ", offset=" + offset +
+ ", timestamp=" + timestamp +
+ ", headers=" + headers +
+ '}';
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
index 870eeee..3893b35 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
@@ -27,7 +28,7 @@ public class ContextualRecord {
public ContextualRecord(final byte[] value, final ProcessorRecordContext recordContext) {
this.value = value;
- this.recordContext = recordContext;
+ this.recordContext = Objects.requireNonNull(recordContext);
}
public ProcessorRecordContext recordContext() {
@@ -42,6 +43,38 @@ public class ContextualRecord {
return (value == null ? 0 : value.length) + recordContext.sizeBytes();
}
+ byte[] serialize() {
+ final byte[] serializedContext = recordContext.serialize();
+
+ final int sizeOfContext = serializedContext.length;
+ final int sizeOfValueLength = Integer.BYTES;
+ final int sizeOfValue = value == null ? 0 : value.length;
+ final ByteBuffer buffer = ByteBuffer.allocate(sizeOfContext + sizeOfValueLength + sizeOfValue);
+
+ buffer.put(serializedContext);
+ if (value == null) {
+ buffer.putInt(-1);
+ } else {
+ buffer.putInt(value.length);
+ buffer.put(value);
+ }
+
+ return buffer.array();
+ }
+
+ static ContextualRecord deserialize(final ByteBuffer buffer) {
+ final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer);
+
+ final int valueLength = buffer.getInt();
+ if (valueLength == -1) {
+ return new ContextualRecord(null, context);
+ } else {
+ final byte[] value = new byte[valueLength];
+ buffer.get(value);
+ return new ContextualRecord(value, context);
+ }
+ }
+
@Override
public boolean equals(final Object o) {
if (this == o) {
@@ -59,4 +92,12 @@ public class ContextualRecord {
public int hashCode() {
return Objects.hash(value, recordContext);
}
+
+ @Override
+ public String toString() {
+ return "ContextualRecord{" +
+ "recordContext=" + recordContext +
+ ", value=" + Arrays.toString(value) +
+ '}';
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 65f5388..8e58b16 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -17,6 +17,9 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
@@ -45,9 +48,13 @@ import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
-public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer {
+import static java.util.Objects.requireNonNull;
+
+public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer {
private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
+ private static final RecordHeaders V_1_CHANGELOG_HEADERS =
+ new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
private final Map<Bytes, BufferKey> index = new HashMap<>();
private final TreeMap<BufferKey, ContextualRecord> sortedMap = new TreeMap<>();
@@ -65,6 +72,8 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
private volatile boolean open;
+ private int partition;
+
public static class Builder implements StoreBuilder<StateStore> {
private final String storeName;
@@ -130,7 +139,7 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
}
}
- private static class BufferKey implements Comparable<BufferKey> {
+ private static final class BufferKey implements Comparable<BufferKey> {
private final long time;
private final Bytes key;
@@ -163,6 +172,14 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
final int timeComparison = Long.compare(time, o.time);
return timeComparison == 0 ? key.compareTo(o.key) : timeComparison;
}
+
+ @Override
+ public String toString() {
+ return "BufferKey{" +
+ "key=" + key +
+ ", time=" + time +
+ '}';
+ }
}
private InMemoryTimeOrderedKeyValueBuffer(final String storeName, final boolean loggingEnabled) {
@@ -194,6 +211,7 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
}
updateBufferMetrics();
open = true;
+ partition = context.taskId().partition;
}
@Override
@@ -222,33 +240,52 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
if (bufferKey == null) {
// The record was evicted from the buffer. Send a tombstone.
- collector.send(changelogTopic, key, null, null, null, null, KEY_SERIALIZER, VALUE_SERIALIZER);
+ logTombstone(key);
} else {
final ContextualRecord value = sortedMap.get(bufferKey);
- final byte[] innerValue = value.value();
- final byte[] timeAndValue = ByteBuffer.wrap(new byte[8 + innerValue.length])
- .putLong(bufferKey.time)
- .put(innerValue)
- .array();
-
- final ProcessorRecordContext recordContext = value.recordContext();
- collector.send(
- changelogTopic,
- key,
- timeAndValue,
- recordContext.headers(),
- recordContext.partition(),
- recordContext.timestamp(),
- KEY_SERIALIZER,
- VALUE_SERIALIZER
- );
+ logValue(key, bufferKey, value);
}
}
dirtyKeys.clear();
}
}
+ private void logValue(final Bytes key, final BufferKey bufferKey, final ContextualRecord value) {
+ final byte[] serializedContextualRecord = value.serialize();
+
+ final int sizeOfBufferTime = Long.BYTES;
+ final int sizeOfContextualRecord = serializedContextualRecord.length;
+
+ final byte[] timeAndContextualRecord = ByteBuffer.wrap(new byte[sizeOfBufferTime + sizeOfContextualRecord])
+ .putLong(bufferKey.time)
+ .put(serializedContextualRecord)
+ .array();
+
+ collector.send(
+ changelogTopic,
+ key,
+ timeAndContextualRecord,
+ V_1_CHANGELOG_HEADERS,
+ partition,
+ null,
+ KEY_SERIALIZER,
+ VALUE_SERIALIZER
+ );
+ }
+
+ private void logTombstone(final Bytes key) {
+ collector.send(changelogTopic,
+ key,
+ null,
+ null,
+ partition,
+ null,
+ KEY_SERIALIZER,
+ VALUE_SERIALIZER
+ );
+ }
+
private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch) {
for (final ConsumerRecord<byte[], byte[]> record : batch) {
final Bytes key = Bytes.wrap(record.key());
@@ -256,26 +293,62 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
// This was a tombstone. Delete the record.
final BufferKey bufferKey = index.remove(key);
if (bufferKey != null) {
- sortedMap.remove(bufferKey);
+ final ContextualRecord removed = sortedMap.remove(bufferKey);
+ if (removed != null) {
+ memBufferSize -= computeRecordSize(bufferKey.key, removed);
+ }
+ if (bufferKey.time == minTimestamp) {
+ minTimestamp = sortedMap.isEmpty() ? Long.MAX_VALUE : sortedMap.firstKey().time;
+ }
+ }
+
+ if (record.partition() != partition) {
+ throw new IllegalStateException(
+ String.format(
+ "record partition [%d] is being restored by the wrong suppress partition [%d]",
+ record.partition(),
+ partition
+ )
+ );
}
} else {
final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
final long time = timeAndValue.getLong();
final byte[] value = new byte[record.value().length - 8];
timeAndValue.get(value);
-
- cleanPut(
- time,
- key,
- new ContextualRecord(
- value,
- new ProcessorRecordContext(
- record.timestamp(),
- record.offset(),
- record.partition(),
- record.topic(),
- record.headers()
+ if (record.headers().lastHeader("v") == null) {
+ cleanPut(
+ time,
+ key,
+ new ContextualRecord(
+ value,
+ new ProcessorRecordContext(
+ record.timestamp(),
+ record.offset(),
+ record.partition(),
+ record.topic(),
+ record.headers()
+ )
)
+ );
+ } else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
+ final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(value));
+
+ cleanPut(
+ time,
+ key,
+ contextualRecord
+ );
+ } else {
+ throw new IllegalArgumentException("Restoring apparently invalid changelog record: " + record);
+ }
+ }
+ if (record.partition() != partition) {
+ throw new IllegalStateException(
+ String.format(
+ "record partition [%d] is being restored by the wrong suppress partition [%d]",
+ record.partition(),
+ partition
)
);
}
@@ -298,6 +371,12 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
// predicate being true means we read one record, call the callback, and then remove it
while (next != null && predicate.get()) {
+ if (next.getKey().time != minTimestamp) {
+ throw new IllegalStateException(
+ "minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" +
+ next.getKey().time + "]"
+ );
+ }
callback.accept(new KeyValue<>(next.getKey().key, next.getValue()));
delegate.remove();
@@ -305,7 +384,7 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
dirtyKeys.add(next.getKey().key);
- memBufferSize = memBufferSize - computeRecordSize(next.getKey().key, next.getValue());
+ memBufferSize -= computeRecordSize(next.getKey().key, next.getValue());
// peek at the next record so we can update the minTimestamp
if (delegate.hasNext()) {
@@ -327,8 +406,11 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
@Override
public void put(final long time,
final Bytes key,
- final ContextualRecord value) {
- cleanPut(time, key, value);
+ final ContextualRecord contextualRecord) {
+ requireNonNull(contextualRecord.value(), "value cannot be null");
+ requireNonNull(contextualRecord.recordContext(), "recordContext cannot be null");
+
+ cleanPut(time, key, contextualRecord);
dirtyKeys.add(key);
updateBufferMetrics();
}
@@ -344,7 +426,7 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
index.put(key, nextKey);
sortedMap.put(nextKey, value);
minTimestamp = Math.min(minTimestamp, time);
- memBufferSize = memBufferSize + computeRecordSize(key, value);
+ memBufferSize += computeRecordSize(key, value);
} else {
final ContextualRecord removedValue = sortedMap.put(previousKey, value);
memBufferSize =
@@ -369,7 +451,7 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
return minTimestamp;
}
- private long computeRecordSize(final Bytes key, final ContextualRecord value) {
+ private static long computeRecordSize(final Bytes key, final ContextualRecord value) {
long size = 0L;
size += 8; // buffer time
size += key.get().length;
@@ -383,4 +465,19 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
bufferSizeSensor.record(memBufferSize);
bufferCountSensor.record(index.size());
}
+
+ @Override
+ public String toString() {
+ return "InMemoryTimeOrderedKeyValueBuffer{" +
+ "storeName='" + storeName + '\'' +
+ ", changelogTopic='" + changelogTopic + '\'' +
+ ", open=" + open +
+ ", loggingEnabled=" + loggingEnabled +
+ ", minTimestamp=" + minTimestamp +
+ ", memBufferSize=" + memBufferSize +
+ ", \n\tdirtyKeys=" + dirtyKeys +
+ ", \n\tindex=" + index +
+ ", \n\tsortedMap=" + sortedMap +
+ '}';
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java b/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java
index 4213112..b578562 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams;
+import java.util.Objects;
+
public class KeyValueTimestamp<K, V> {
private final K key;
private final V value;
@@ -43,4 +45,19 @@ public class KeyValueTimestamp<K, V> {
public String toString() {
return "KeyValueTimestamp{key=" + key + ", value=" + value + ", timestamp=" + timestamp + '}';
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ final KeyValueTimestamp<?, ?> that = (KeyValueTimestamp<?, ?>) o;
+ return timestamp == that.timestamp &&
+ Objects.equals(key, that.key) &&
+ Objects.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, value, timestamp);
+ }
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index 6f759bc..b9752c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -34,11 +34,13 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
@@ -48,13 +50,18 @@ import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
+import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import static java.lang.Long.MAX_VALUE;
import static java.time.Duration.ofMillis;
@@ -71,9 +78,10 @@ import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecord
import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
@RunWith(Parameterized.class)
-@Category({IntegrationTest.class})
+@Category(IntegrationTest.class)
public class SuppressionDurabilityIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
@@ -88,26 +96,13 @@ public class SuppressionDurabilityIntegrationTest {
private static final int COMMIT_INTERVAL = 100;
private final boolean eosEnabled;
- public SuppressionDurabilityIntegrationTest(final boolean eosEnabled) {
- this.eosEnabled = eosEnabled;
- }
-
@Parameters(name = "{index}: eosEnabled={0}")
public static Collection<Object[]> parameters() {
- return Arrays.asList(new Object[] {false}, new Object[] {true});
+ return asList(new Object[] {false}, new Object[] {true});
}
- private KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) {
- return builder
- .table(
- input,
- Consumed.with(STRING_SERDE, STRING_SERDE),
- Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(STRING_SERDE, STRING_SERDE)
- .withCachingDisabled()
- .withLoggingDisabled()
- )
- .groupBy((k, v) -> new KeyValue<>(v, k), Grouped.with(STRING_SERDE, STRING_SERDE))
- .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts").withCachingDisabled());
+ public SuppressionDurabilityIntegrationTest(final boolean eosEnabled) {
+ this.eosEnabled = eosEnabled;
}
@Test
@@ -115,13 +110,21 @@ public class SuppressionDurabilityIntegrationTest {
final String testId = "-shouldRecoverBufferAfterShutdown";
final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
final String input = "input" + testId;
+ final String storeName = "counts";
final String outputSuppressed = "output-suppressed" + testId;
final String outputRaw = "output-raw" + testId;
- cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
+ // create multiple partitions as a trap, in case the buffer doesn't properly set the
+ // partition on the records, but instead relies on the default key partitioner
+ cleanStateBeforeTest(CLUSTER, 2, input, outputRaw, outputSuppressed);
final StreamsBuilder builder = new StreamsBuilder();
- final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
+ final KTable<String, Long> valueCounts = builder
+ .stream(
+ input,
+ Consumed.with(STRING_SERDE, STRING_SERDE))
+ .groupByKey()
+ .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(storeName).withCachingDisabled());
final KStream<String, Long> suppressedCounts = valueCounts
.suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(3L).emitEarlyWhenFull()))
@@ -130,11 +133,16 @@ public class SuppressionDurabilityIntegrationTest {
final AtomicInteger eventCount = new AtomicInteger(0);
suppressedCounts.foreach((key, value) -> eventCount.incrementAndGet());
+ // expect all post-suppress records to keep the right input topic
+ final MetadataValidator metadataValidator = new MetadataValidator(input);
+
suppressedCounts
+ .transform(metadataValidator)
.to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
valueCounts
.toStream()
+ .transform(metadataValidator)
.to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
final Properties streamsConfig = mkProperties(mkMap(
@@ -149,7 +157,9 @@ public class SuppressionDurabilityIntegrationTest {
KafkaStreams driver = getStartedStreams(streamsConfig, builder, true);
try {
// start by putting some stuff in the buffer
- produceSynchronously(
+ // note, we send all input records to partition 0
+ // to make sure that suppress doesn't erroneously send records to other partitions.
+ produceSynchronouslyToPartitionZero(
input,
asList(
new KeyValueTimestamp<>("k1", "v1", scaledTime(1L)),
@@ -159,16 +169,16 @@ public class SuppressionDurabilityIntegrationTest {
);
verifyOutput(
outputRaw,
- asList(
- new KeyValueTimestamp<>("v1", 1L, scaledTime(1L)),
- new KeyValueTimestamp<>("v2", 1L, scaledTime(2L)),
- new KeyValueTimestamp<>("v3", 1L, scaledTime(3L))
- )
+ new HashSet<>(asList(
+ new KeyValueTimestamp<>("k1", 1L, scaledTime(1L)),
+ new KeyValueTimestamp<>("k2", 1L, scaledTime(2L)),
+ new KeyValueTimestamp<>("k3", 1L, scaledTime(3L))
+ ))
);
assertThat(eventCount.get(), is(0));
// flush two of the first three events out.
- produceSynchronously(
+ produceSynchronouslyToPartitionZero(
input,
asList(
new KeyValueTimestamp<>("k4", "v4", scaledTime(4L)),
@@ -177,17 +187,17 @@ public class SuppressionDurabilityIntegrationTest {
);
verifyOutput(
outputRaw,
- asList(
- new KeyValueTimestamp<>("v4", 1L, scaledTime(4L)),
- new KeyValueTimestamp<>("v5", 1L, scaledTime(5L))
- )
+ new HashSet<>(asList(
+ new KeyValueTimestamp<>("k4", 1L, scaledTime(4L)),
+ new KeyValueTimestamp<>("k5", 1L, scaledTime(5L))
+ ))
);
assertThat(eventCount.get(), is(2));
verifyOutput(
outputSuppressed,
asList(
- new KeyValueTimestamp<>("v1", 1L, scaledTime(1L)),
- new KeyValueTimestamp<>("v2", 1L, scaledTime(2L))
+ new KeyValueTimestamp<>("k1", 1L, scaledTime(1L)),
+ new KeyValueTimestamp<>("k2", 1L, scaledTime(2L))
)
);
@@ -201,7 +211,7 @@ public class SuppressionDurabilityIntegrationTest {
// flush those recovered buffered events out.
- produceSynchronously(
+ produceSynchronouslyToPartitionZero(
input,
asList(
new KeyValueTimestamp<>("k6", "v6", scaledTime(6L)),
@@ -211,29 +221,78 @@ public class SuppressionDurabilityIntegrationTest {
);
verifyOutput(
outputRaw,
- asList(
- new KeyValueTimestamp<>("v6", 1L, scaledTime(6L)),
- new KeyValueTimestamp<>("v7", 1L, scaledTime(7L)),
- new KeyValueTimestamp<>("v8", 1L, scaledTime(8L))
- )
+ new HashSet<>(asList(
+ new KeyValueTimestamp<>("k6", 1L, scaledTime(6L)),
+ new KeyValueTimestamp<>("k7", 1L, scaledTime(7L)),
+ new KeyValueTimestamp<>("k8", 1L, scaledTime(8L))
+ ))
);
- assertThat(eventCount.get(), is(5));
+ assertThat("suppress has apparently produced some duplicates. There should only be 5 output events.",
+ eventCount.get(), is(5));
+
verifyOutput(
outputSuppressed,
asList(
- new KeyValueTimestamp<>("v3", 1L, scaledTime(3L)),
- new KeyValueTimestamp<>("v4", 1L, scaledTime(4L)),
- new KeyValueTimestamp<>("v5", 1L, scaledTime(5L))
+ new KeyValueTimestamp<>("k3", 1L, scaledTime(3L)),
+ new KeyValueTimestamp<>("k4", 1L, scaledTime(4L)),
+ new KeyValueTimestamp<>("k5", 1L, scaledTime(5L))
)
);
+ metadataValidator.raiseExceptionIfAny();
+
} finally {
driver.close();
cleanStateAfterTest(CLUSTER, driver);
}
}
- private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
+ private static final class MetadataValidator implements TransformerSupplier<String, Long, KeyValue<String, Long>> {
+ private static final Logger LOG = LoggerFactory.getLogger(MetadataValidator.class);
+ private final AtomicReference<Throwable> firstException = new AtomicReference<>();
+ private final String topic;
+
+ MetadataValidator(final String topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public Transformer<String, Long, KeyValue<String, Long>> get() {
+ return new Transformer<String, Long, KeyValue<String, Long>>() {
+ private ProcessorContext context;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public KeyValue<String, Long> transform(final String key, final Long value) {
+ try {
+ assertThat(context.topic(), equalTo(topic));
+ } catch (final Throwable e) {
+ firstException.compareAndSet(null, e);
+ LOG.error("Validation Failed", e);
+ }
+ return new KeyValue<>(key, value);
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+ }
+
+ void raiseExceptionIfAny() {
+ final Throwable exception = firstException.get();
+ if (exception != null) {
+ throw new AssertionError("Got an exception during run", exception);
+ }
+ }
+ }
+
+ private static void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
final Properties properties = mkProperties(
mkMap(
mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
@@ -243,24 +302,35 @@ public class SuppressionDurabilityIntegrationTest {
)
);
IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps);
+ }
+ private static void verifyOutput(final String topic, final Set<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
+ final Properties properties = mkProperties(
+ mkMap(
+ mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
+ mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+ mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()),
+ mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName())
+ )
+ );
+ IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps);
}
/**
* scaling to ensure that there are commits in between the various test events,
* just to exercise that everything works properly in the presence of commits.
*/
- private long scaledTime(final long unscaledTime) {
+ private static long scaledTime(final long unscaledTime) {
return COMMIT_INTERVAL * 2 * unscaledTime;
}
- private void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
+ private static void produceSynchronouslyToPartitionZero(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
final Properties producerConfig = mkProperties(mkMap(
mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"),
mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())
));
- IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, toProduce);
+ IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, Optional.of(0), toProduce);
}
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index 1991e15..46a4f13 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -44,6 +44,7 @@ import org.junit.experimental.categories.Category;
import java.time.Duration;
import java.util.List;
import java.util.Locale;
+import java.util.Optional;
import java.util.Properties;
import static java.lang.Long.MAX_VALUE;
@@ -197,7 +198,7 @@ public class SuppressionIntegrationTest {
mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())
));
- IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, toProduce);
+ IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, Optional.empty(), toProduce);
}
private void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index e61b312..74cac06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -58,11 +58,16 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
/**
* Utility functions to make integration testing more convenient.
*/
@@ -117,10 +122,14 @@ public class IntegrationTestUtils {
}
public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, final String... topics) {
+ cleanStateBeforeTest(cluster, 1, topics);
+ }
+
+ public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, final int partitionCount, final String... topics) {
try {
cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
for (final String topic : topics) {
- cluster.createTopic(topic, 1, 1);
+ cluster.createTopic(topic, partitionCount, 1);
}
} catch (final InterruptedException e) {
throw new RuntimeException(e);
@@ -147,7 +156,7 @@ public class IntegrationTestUtils {
public static <K, V> void produceKeyValuesSynchronously(
final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Time time)
throws ExecutionException, InterruptedException {
- IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, time, false);
+ produceKeyValuesSynchronously(topic, records, producerConfig, time, false);
}
/**
@@ -162,7 +171,7 @@ public class IntegrationTestUtils {
public static <K, V> void produceKeyValuesSynchronously(
final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Headers headers, final Time time)
throws ExecutionException, InterruptedException {
- IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, headers, time, false);
+ produceKeyValuesSynchronously(topic, records, producerConfig, headers, time, false);
}
/**
@@ -177,7 +186,7 @@ public class IntegrationTestUtils {
public static <K, V> void produceKeyValuesSynchronously(
final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Time time, final boolean enableTransactions)
throws ExecutionException, InterruptedException {
- IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, null, time, enableTransactions);
+ produceKeyValuesSynchronously(topic, records, producerConfig, null, time, enableTransactions);
}
/**
@@ -221,7 +230,7 @@ public class IntegrationTestUtils {
final Properties producerConfig,
final Long timestamp)
throws ExecutionException, InterruptedException {
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, timestamp, false);
+ produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, timestamp, false);
}
/**
@@ -239,7 +248,7 @@ public class IntegrationTestUtils {
final Long timestamp,
final boolean enableTransactions)
throws ExecutionException, InterruptedException {
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, null, timestamp, enableTransactions);
+ produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, null, timestamp, enableTransactions);
}
/**
@@ -277,20 +286,19 @@ public class IntegrationTestUtils {
}
public static <V, K> void produceSynchronously(final Properties producerConfig,
- final boolean eos,
- final String topic,
- final List<KeyValueTimestamp<K, V>> toProduce) {
+ final boolean eos,
+ final String topic,
+ final Optional<Integer> partition,
+ final List<KeyValueTimestamp<K, V>> toProduce) {
try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
- // TODO: test EOS
- //noinspection ConstantConditions
- if (false) {
+ if (eos) {
producer.initTransactions();
producer.beginTransaction();
}
final LinkedList<Future<RecordMetadata>> futures = new LinkedList<>();
for (final KeyValueTimestamp<K, V> record : toProduce) {
final Future<RecordMetadata> f = producer.send(
- new ProducerRecord<>(topic, null, record.timestamp(), record.key(), record.value(), null)
+ new ProducerRecord<>(topic, partition.orElse(null), record.timestamp(), record.key(), record.value(), null)
);
futures.add(f);
}
@@ -351,7 +359,7 @@ public class IntegrationTestUtils {
final Properties producerConfig,
final Time time)
throws ExecutionException, InterruptedException {
- IntegrationTestUtils.produceValuesSynchronously(topic, records, producerConfig, time, false);
+ produceValuesSynchronously(topic, records, producerConfig, time, false);
}
/**
@@ -677,7 +685,7 @@ public class IntegrationTestUtils {
final List<ConsumerRecord<String, Long>> results;
try {
- results = IntegrationTestUtils.waitUntilMinRecordsReceived(consumerConfig, topic, expected.size());
+ results = waitUntilMinRecordsReceived(consumerConfig, topic, expected.size());
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
@@ -696,6 +704,28 @@ public class IntegrationTestUtils {
}
}
+ public static void verifyKeyValueTimestamps(final Properties consumerConfig,
+ final String topic,
+ final Set<KeyValueTimestamp<String, Long>> expected) {
+ final List<ConsumerRecord<String, Long>> results;
+ try {
+ results = waitUntilMinRecordsReceived(consumerConfig, topic, expected.size());
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (results.size() != expected.size()) {
+ throw new AssertionError(printRecords(results) + " != " + expected);
+ }
+
+ final Set<KeyValueTimestamp<String, Long>> actual =
+ results.stream()
+ .map(result -> new KeyValueTimestamp<>(result.key(), result.value(), result.timestamp()))
+ .collect(Collectors.toSet());
+
+ assertThat(actual, equalTo(expected));
+ }
+
private static <K, V> void compareKeyValueTimestamp(final ConsumerRecord<K, V> record,
final K expectedKey,
final V expectedValue,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 354fdd5..4b182cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -56,7 +56,6 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
-@SuppressWarnings("PointlessArithmeticExpression")
public class KTableSuppressProcessorTest {
private static final long ARBITRARY_LONG = 5L;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
new file mode 100644
index 0000000..2953953
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -0,0 +1,604 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.test.MockInternalProcessorContext;
+import org.apache.kafka.test.MockInternalProcessorContext.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> {
+ private static final RecordHeaders V_1_CHANGELOG_HEADERS =
+ new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
+
+ private static final String APP_ID = "test-app";
+ private final Function<String, B> bufferSupplier;
+ private final String testName;
+
+ // As we add more buffer implementations/configurations, we can add them here
+ @Parameterized.Parameters(name = "{index}: test={0}")
+ public static Collection<Object[]> parameters() {
+ return singletonList(
+ new Object[] {
+ "in-memory buffer",
+ (Function<String, InMemoryTimeOrderedKeyValueBuffer>) name ->
+ (InMemoryTimeOrderedKeyValueBuffer) new InMemoryTimeOrderedKeyValueBuffer
+ .Builder(name)
+ .build()
+ }
+ );
+ }
+
+ public TimeOrderedKeyValueBufferTest(final String testName, final Function<String, B> bufferSupplier) {
+ this.testName = testName + "_" + new Random().nextInt(Integer.MAX_VALUE);
+ this.bufferSupplier = bufferSupplier;
+ }
+
+ private static MockInternalProcessorContext makeContext() {
+ final Properties properties = new Properties();
+ properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
+ properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+
+ final TaskId taskId = new TaskId(0, 0);
+
+ final MockInternalProcessorContext context = new MockInternalProcessorContext(properties, taskId, TestUtils.tempDirectory());
+ context.setRecordCollector(new MockRecordCollector());
+
+ return context;
+ }
+
+
+ private static void cleanup(final MockInternalProcessorContext context, final TimeOrderedKeyValueBuffer buffer) {
+ try {
+ buffer.close();
+ Utils.delete(context.stateDir());
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void shouldInit() {
+ final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final MockInternalProcessorContext context = makeContext();
+ buffer.init(context, buffer);
+ cleanup(context, buffer);
+ }
+
+ @Test
+ public void shouldAcceptData() {
+ final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final MockInternalProcessorContext context = makeContext();
+ buffer.init(context, buffer);
+ putRecord(buffer, context, "2p93nf", 0, "asdf");
+ cleanup(context, buffer);
+ }
+
+ @Test
+ public void shouldRejectNullValues() {
+ final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final MockInternalProcessorContext context = makeContext();
+ buffer.init(context, buffer);
+ try {
+ buffer.put(0, getBytes("asdf"), new ContextualRecord(
+ null,
+ new ProcessorRecordContext(0, 0, 0, "topic")
+ ));
+ fail("expected an exception");
+ } catch (final NullPointerException expected) {
+ // expected
+ }
+ cleanup(context, buffer);
+ }
+
+ private static ContextualRecord getRecord(final String value) {
+ return getRecord(value, 0L);
+ }
+
+ private static ContextualRecord getRecord(final String value, final long timestamp) {
+ return new ContextualRecord(
+ value.getBytes(UTF_8),
+ new ProcessorRecordContext(timestamp, 0, 0, "topic")
+ );
+ }
+
+ private static Bytes getBytes(final String key) {
+ return Bytes.wrap(key.getBytes(UTF_8));
+ }
+
+ @Test
+ public void shouldRemoveData() {
+ final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final MockInternalProcessorContext context = makeContext();
+ buffer.init(context, buffer);
+ putRecord(buffer, context, "qwer", 0, "asdf");
+ assertThat(buffer.numRecords(), is(1));
+ buffer.evictWhile(() -> true, kv -> { });
+ assertThat(buffer.numRecords(), is(0));
+ cleanup(context, buffer);
+ }
+
+ @Test
+ public void shouldRespectEvictionPredicate() {
+ final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final MockInternalProcessorContext context = makeContext();
+ buffer.init(context, buffer);
+ final Bytes firstKey = getBytes("asdf");
+ final ContextualRecord firstRecord = getRecord("eyt");
+ putRecord(0, buffer, context, firstRecord, firstKey);
+ putRecord(buffer, context, "rtg", 1, "zxcv");
+ assertThat(buffer.numRecords(), is(2));
+ final List<KeyValue<Bytes, ContextualRecord>> evicted = new LinkedList<>();
+ buffer.evictWhile(() -> buffer.numRecords() > 1, evicted::add);
+ assertThat(buffer.numRecords(), is(1));
+ assertThat(evicted, is(singletonList(new KeyValue<>(firstKey, firstRecord))));
+ cleanup(context, buffer);
+ }
+
+ @Test
+ public void shouldTrackCount() {
+ final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final MockInternalProcessorContext context = makeContext();
+ buffer.init(context, buffer);
+ putRecord(buffer, context, "oin", 0, "asdf");
+ assertThat(buffer.numRecords(), is(1));
+ putRecord(buffer, context, "wekjn", 1, "asdf");
+ assertThat(buffer.numRecords(), is(1));
+ putRecord(buffer, context, "24inf", 0, "zxcv");
+ assertThat(buffer.numRecords(), is(2));
+ cleanup(context, buffer);
+ }
+
+ @Test
+ public void shouldTrackSize() {
+ final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final MockInternalProcessorContext context = makeContext();
+ buffer.init(context, buffer);
+ putRecord(buffer, context, "23roni", 0, "asdf");
+ assertThat(buffer.bufferSize(), is(43L));
+ putRecord(buffer, context, "3l", 1, "asdf");
+ assertThat(buffer.bufferSize(), is(39L));
+ putRecord(buffer, context, "qfowin", 0, "zxcv");
+ assertThat(buffer.bufferSize(), is(82L));
+ cleanup(context, buffer);
+ }
+
+ @Test
+ public void shouldTrackMinTimestamp() {
+ final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final MockInternalProcessorContext context = makeContext();
+ buffer.init(context, buffer);
+ putRecord(buffer, context, "2093j", 1, "asdf");
+ assertThat(buffer.minTimestamp(), is(1L));
+ putRecord(buffer, context, "3gon4i", 0, "zxcv");
+ assertThat(buffer.minTimestamp(), is(0L));
+ cleanup(context, buffer);
+ }
+
+ private static void putRecord(final TimeOrderedKeyValueBuffer buffer,
+ final MockInternalProcessorContext context,
+ final String value,
+ final int time,
+ final String key) {
+ putRecord(time, buffer, context, getRecord(value), getBytes(key));
+ }
+
+ private static void putRecord(final int time,
+ final TimeOrderedKeyValueBuffer buffer,
+ final MockInternalProcessorContext context,
+ final ContextualRecord firstRecord,
+ final Bytes firstKey) {
+ context.setRecordContext(firstRecord.recordContext());
+ buffer.put(time, firstKey, firstRecord);
+ }
+
+ @Test
+ public void shouldEvictOldestAndUpdateSizeAndCountAndMinTimestamp() {
+ final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final MockInternalProcessorContext context = makeContext();
+ buffer.init(context, buffer);
+
+ putRecord(buffer, context, "o23i4", 1, "zxcv");
+ assertThat(buffer.numRecords(), is(1));
+ assertThat(buffer.bufferSize(), is(42L));
+ assertThat(buffer.minTimestamp(), is(1L));
+
+ putRecord(buffer, context, "3ng", 0, "asdf");
+ assertThat(buffer.numRecords(), is(2));
+ assertThat(buffer.bufferSize(), is(82L));
+ assertThat(buffer.minTimestamp(), is(0L));
+
+ final AtomicInteger callbackCount = new AtomicInteger(0);
+ buffer.evictWhile(() -> true, kv -> {
+ switch (callbackCount.incrementAndGet()) {
+ case 1: {
+ assertThat(new String(kv.key.get(), UTF_8), is("asdf"));
+ assertThat(buffer.numRecords(), is(2));
+ assertThat(buffer.bufferSize(), is(82L));
+ assertThat(buffer.minTimestamp(), is(0L));
+ break;
+ }
+ case 2: {
+ assertThat(new String(kv.key.get(), UTF_8), is("zxcv"));
+ assertThat(buffer.numRecords(), is(1));
+ assertThat(buffer.bufferSize(), is(42L));
+ assertThat(buffer.minTimestamp(), is(1L));
+ break;
+ }
+ default: {
+ fail("too many invocations");
+ break;
+ }
+ }
+ });
+ assertThat(callbackCount.get(), is(2));
+ assertThat(buffer.numRecords(), is(0));
+ assertThat(buffer.bufferSize(), is(0L));
+ assertThat(buffer.minTimestamp(), is(Long.MAX_VALUE));
+ cleanup(context, buffer);
+ }
+
+ @Test
+ public void shouldFlush() {
+ final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final MockInternalProcessorContext context = makeContext();
+ buffer.init(context, buffer);
+ putRecord(2, buffer, context, getRecord("2093j", 0L), getBytes("asdf"));
+ putRecord(1, buffer, context, getRecord("3gon4i", 1L), getBytes("zxcv"));
+ putRecord(0, buffer, context, getRecord("deadbeef", 2L), getBytes("deleteme"));
+
+ // replace "deleteme" with a tombstone
+ buffer.evictWhile(() -> buffer.minTimestamp() < 1, kv -> { });
+
+ // flush everything to the changelog
+ buffer.flush();
+
+ // the buffer should serialize the buffer time and the value as byte[],
+ // which we can't compare for equality using ProducerRecord.
+ // As a workaround, I'm deserializing them and shoving them in a KeyValue, just for ease of testing.
+
+ final List<ProducerRecord<String, KeyValue<Long, ContextualRecord>>> collected =
+ ((MockRecordCollector) context.recordCollector())
+ .collected()
+ .stream()
+ .map(pr -> {
+ final KeyValue<Long, ContextualRecord> niceValue;
+ if (pr.value() == null) {
+ niceValue = null;
+ } else {
+ final byte[] timestampAndValue = pr.value();
+ final ByteBuffer wrap = ByteBuffer.wrap(timestampAndValue);
+ final long timestamp = wrap.getLong();
+ final ContextualRecord contextualRecord = ContextualRecord.deserialize(wrap);
+ niceValue = new KeyValue<>(timestamp, contextualRecord);
+ }
+
+ return new ProducerRecord<>(pr.topic(),
+ pr.partition(),
+ pr.timestamp(),
+ new String(pr.key(), UTF_8),
+ niceValue,
+ pr.headers());
+ })
+ .collect(Collectors.toList());
+
+ assertThat(collected, is(asList(
+ new ProducerRecord<>(APP_ID + "-" + testName + "-changelog",
+ 0, // Producer will assign
+ null,
+ "deleteme",
+ null,
+ new RecordHeaders()
+ ),
+ new ProducerRecord<>(APP_ID + "-" + testName + "-changelog",
+ 0,
+ null,
+ "zxcv",
+ new KeyValue<>(1L, getRecord("3gon4i", 1)),
+ V_1_CHANGELOG_HEADERS
+ ),
+ new ProducerRecord<>(APP_ID + "-" + testName + "-changelog",
+ 0,
+ null,
+ "asdf",
+ new KeyValue<>(2L, getRecord("2093j", 0)),
+ V_1_CHANGELOG_HEADERS
+ )
+ )));
+
+ cleanup(context, buffer);
+ }
+
+
+ @Test
+ public void shouldRestoreOldFormat() {
+ final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final MockInternalProcessorContext context = makeContext();
+ buffer.init(context, buffer);
+
+ final RecordBatchingStateRestoreCallback stateRestoreCallback =
+ (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
+
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, ""));
+
+ stateRestoreCallback.restoreBatch(asList(
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 0,
+ 0,
+ TimestampType.CREATE_TIME,
+ -1,
+ -1,
+ -1,
+ "todelete".getBytes(UTF_8),
+ ByteBuffer.allocate(Long.BYTES + 6).putLong(0L).put("doomed".getBytes(UTF_8)).array()),
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 1,
+ 1,
+ TimestampType.CREATE_TIME,
+ -1,
+ -1,
+ -1,
+ "asdf".getBytes(UTF_8),
+ ByteBuffer.allocate(Long.BYTES + 4).putLong(2L).put("qwer".getBytes(UTF_8)).array()),
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 2,
+ 2,
+ TimestampType.CREATE_TIME,
+ -1,
+ -1,
+ -1,
+ "zxcv".getBytes(UTF_8),
+ ByteBuffer.allocate(Long.BYTES + 5).putLong(1L).put("3o4im".getBytes(UTF_8)).array())
+ ));
+
+ assertThat(buffer.numRecords(), is(3));
+ assertThat(buffer.minTimestamp(), is(0L));
+ assertThat(buffer.bufferSize(), is(160L));
+
+ stateRestoreCallback.restoreBatch(singletonList(
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 3,
+ 3,
+ TimestampType.CREATE_TIME,
+ -1,
+ -1,
+ -1,
+ "todelete".getBytes(UTF_8),
+ null)
+ ));
+
+ assertThat(buffer.numRecords(), is(2));
+ assertThat(buffer.minTimestamp(), is(1L));
+ assertThat(buffer.bufferSize(), is(103L));
+
+ // flush the buffer into a list in buffer order so we can make assertions about the contents.
+
+ final List<KeyValue<Bytes, ContextualRecord>> evicted = new LinkedList<>();
+ buffer.evictWhile(() -> true, evicted::add);
+
+ // Several things to note:
+ // * The buffered records are ordered according to their buffer time (serialized in the value of the changelog)
+ // * The record timestamps are properly restored, and not conflated with the record's buffer time.
+ // * The keys and values are properly restored
+ // * The record topic is set to the changelog topic. This was an oversight in the original implementation,
+ // which is fixed in changelog format v1. But upgraded applications still need to be able to handle the
+ // original format.
+
+ assertThat(evicted, is(asList(
+ new KeyValue<>(
+ getBytes("zxcv"),
+ new ContextualRecord("3o4im".getBytes(UTF_8),
+ new ProcessorRecordContext(2,
+ 2,
+ 0,
+ "changelog-topic",
+ new RecordHeaders()))),
+ new KeyValue<>(
+ getBytes("asdf"),
+ new ContextualRecord("qwer".getBytes(UTF_8),
+ new ProcessorRecordContext(1,
+ 1,
+ 0,
+ "changelog-topic",
+ new RecordHeaders())))
+ )));
+
+ cleanup(context, buffer);
+ }
+
+ @Test
+ public void shouldRestoreNewFormat() {
+ final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final MockInternalProcessorContext context = makeContext();
+ buffer.init(context, buffer);
+
+ final RecordBatchingStateRestoreCallback stateRestoreCallback =
+ (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
+
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, ""));
+
+ final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
+
+ final byte[] todeleteValue = getRecord("doomed", 0).serialize();
+ final byte[] asdfValue = getRecord("qwer", 1).serialize();
+ final byte[] zxcvValue = getRecord("3o4im", 2).serialize();
+ stateRestoreCallback.restoreBatch(asList(
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 0,
+ 999,
+ TimestampType.CREATE_TIME,
+ -1L,
+ -1,
+ -1,
+ "todelete".getBytes(UTF_8),
+ ByteBuffer.allocate(Long.BYTES + todeleteValue.length).putLong(0L).put(todeleteValue).array(),
+ v1FlagHeaders),
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 1,
+ 9999,
+ TimestampType.CREATE_TIME,
+ -1L,
+ -1,
+ -1,
+ "asdf".getBytes(UTF_8),
+ ByteBuffer.allocate(Long.BYTES + asdfValue.length).putLong(2L).put(asdfValue).array(),
+ v1FlagHeaders),
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 2,
+ 99,
+ TimestampType.CREATE_TIME,
+ -1L,
+ -1,
+ -1,
+ "zxcv".getBytes(UTF_8),
+ ByteBuffer.allocate(Long.BYTES + zxcvValue.length).putLong(1L).put(zxcvValue).array(),
+ v1FlagHeaders)
+ ));
+
+ assertThat(buffer.numRecords(), is(3));
+ assertThat(buffer.minTimestamp(), is(0L));
+ assertThat(buffer.bufferSize(), is(130L));
+
+ stateRestoreCallback.restoreBatch(singletonList(
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 3,
+ 3,
+ TimestampType.CREATE_TIME,
+ -1L,
+ -1,
+ -1,
+ "todelete".getBytes(UTF_8),
+ null)
+ ));
+
+ assertThat(buffer.numRecords(), is(2));
+ assertThat(buffer.minTimestamp(), is(1L));
+ assertThat(buffer.bufferSize(), is(83L));
+
+ // flush the buffer into a list in buffer order so we can make assertions about the contents.
+
+ final List<KeyValue<Bytes, ContextualRecord>> evicted = new LinkedList<>();
+ buffer.evictWhile(() -> true, evicted::add);
+
+ // Several things to note:
+ // * The buffered records are ordered according to their buffer time (serialized in the value of the changelog)
+ // * The record timestamps are properly restored, and not conflated with the record's buffer time.
+ // * The keys and values are properly restored
+ // * The record topic is set to the original input topic, *not* the changelog topic
+ // * The record offset preserves the origininal input record's offset, *not* the offset of the changelog record
+
+
+ assertThat(evicted, is(asList(
+ new KeyValue<>(
+ getBytes("zxcv"),
+ new ContextualRecord("3o4im".getBytes(UTF_8),
+ new ProcessorRecordContext(2,
+ 0,
+ 0,
+ "topic",
+ null))),
+ new KeyValue<>(
+ getBytes("asdf"),
+ new ContextualRecord("qwer".getBytes(UTF_8),
+ new ProcessorRecordContext(1,
+ 0,
+ 0,
+ "topic",
+ null)))
+ )));
+
+ cleanup(context, buffer);
+ }
+
+ @Test
+ public void shouldNotRestoreUnrecognizedVersionRecord() {
+ final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+ final MockInternalProcessorContext context = makeContext();
+ buffer.init(context, buffer);
+
+ final RecordBatchingStateRestoreCallback stateRestoreCallback =
+ (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
+
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, ""));
+
+ final RecordHeaders unknownFlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) -1})});
+
+ final byte[] todeleteValue = getRecord("doomed", 0).serialize();
+ try {
+ stateRestoreCallback.restoreBatch(singletonList(
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 0,
+ 999,
+ TimestampType.CREATE_TIME,
+ -1L,
+ -1,
+ -1,
+ "todelete".getBytes(UTF_8),
+ ByteBuffer.allocate(Long.BYTES + todeleteValue.length).putLong(0L).put(todeleteValue).array(),
+ unknownFlagHeaders)
+ ));
+ fail("expected an exception");
+ } catch (final IllegalArgumentException expected) {
+ // nothing to do.
+ } finally {
+ cleanup(context, buffer);
+ }
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 62a8491..b25aa9e 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -16,15 +16,100 @@
*/
package org.apache.kafka.test;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
+import java.io.File;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static java.util.Collections.unmodifiableList;
+
public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext {
+ public static final class MockRecordCollector implements RecordCollector {
+ private final List<ProducerRecord<byte[], byte[]>> collected = new LinkedList<>();
+
+ @Override
+ public <K, V> void send(final String topic,
+ final K key,
+ final V value,
+ final Headers headers,
+ final Integer partition,
+ final Long timestamp,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer) {
+ collected.add(new ProducerRecord<>(topic,
+ partition,
+ timestamp,
+ keySerializer.serialize(topic, key),
+ valueSerializer.serialize(topic, value),
+ headers));
+ }
+
+ @Override
+ public <K, V> void send(final String topic,
+ final K key,
+ final V value,
+ final Headers headers,
+ final Long timestamp,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer,
+ final StreamPartitioner<? super K, ? super V> partitioner) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void init(final Producer<byte[], byte[]> producer) {
+
+ }
+
+ @Override
+ public void flush() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Map<TopicPartition, Long> offsets() {
+ return null;
+ }
+
+ public List<ProducerRecord<byte[], byte[]>> collected() {
+ return unmodifiableList(collected);
+ }
+ }
+
+ private final Map<String, StateRestoreCallback> restoreCallbacks = new LinkedHashMap<>();
private ProcessorNode currentNode;
+ private RecordCollector recordCollector;
+
+ public MockInternalProcessorContext() {
+ }
+
+ public MockInternalProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
+ super(config, taskId, stateDir);
+ }
@Override
public StreamsMetricsImpl metrics() {
@@ -68,7 +153,24 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
}
@Override
- public void uninitialize() {
+ public void uninitialize() {}
+
+ @Override
+ public RecordCollector recordCollector() {
+ return recordCollector;
+ }
+
+ public void setRecordCollector(final RecordCollector recordCollector) {
+ this.recordCollector = recordCollector;
+ }
+
+ @Override
+ public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
+ restoreCallbacks.put(store.name(), stateRestoreCallback);
+ super.register(store, stateRestoreCallback);
+ }
+ public StateRestoreCallback stateRestoreCallback(final String storeName) {
+ return restoreCallbacks.get(storeName);
}
}
\ No newline at end of file