You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/12 18:20:09 UTC
[kafka] branch trunk updated: KAFKA-8770: KIP-557: Drop idempotent
KTable source updates (#8254)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f54cece KAFKA-8770: KIP-557: Drop idempotent KTable source updates (#8254)
f54cece is described below
commit f54cece73e6116566979bcf6a865d803b7c18974
Author: Richard Yu <yo...@gmail.com>
AuthorDate: Tue May 12 11:19:32 2020 -0700
KAFKA-8770: KIP-557: Drop idempotent KTable source updates (#8254)
Drops idempotent updates from KTable source operators.
Specifically, drop updates in which the value is unchanged,
and the timestamp is the same or larger.
Implements: KIP-557
Reviewers: Bruno Cadonna <br...@confluent.io>, John Roesler <vv...@apache.org>
---
.../streams/kstream/internals/KTableSource.java | 37 +++++++++++++---
.../internals/metrics/ProcessorNodeMetrics.java | 22 ++++++++++
.../state/internals/MeteredKeyValueStore.java | 8 ++--
.../internals/MeteredTimestampedKeyValueStore.java | 51 +++++++++++++++++++++-
.../internals/ValueAndTimestampSerializer.java | 45 +++++++++++++++++++
.../KTableKTableForeignKeyJoinIntegrationTest.java | 18 ++++++--
.../KTableSourceTopicRestartIntegrationTest.java | 1 +
.../integration/RestoreIntegrationTest.java | 13 +++---
.../kstream/internals/KTableSourceTest.java | 43 ++++++++++++++++++
.../internals/KTableTransformValuesTest.java | 4 +-
.../kstream/internals/SuppressScenarioTest.java | 19 ++++----
.../metrics/ProcessorNodeMetricsTest.java | 22 ++++++++++
.../MeteredTimestampedKeyValueStoreTest.java | 48 ++++++++++++++++++++
.../internals/ValueAndTimestampSerializerTest.java | 24 ++++++++++
14 files changed, 322 insertions(+), 33 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index b9f3580..f6756f5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -21,15 +21,20 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
+import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.RawAndDeserializedValue;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.skippedIdempotentUpdatesSensor;
public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class);
@@ -74,10 +79,11 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
private class KTableSourceProcessor extends AbstractProcessor<K, V> {
- private TimestampedKeyValueStore<K, V> store;
+ private MeteredTimestampedKeyValueStore<K, V> store;
private TimestampedTupleForwarder<K, V> tupleForwarder;
private StreamsMetricsImpl metrics;
private Sensor droppedRecordsSensor;
+ private Sensor skippedIdempotentUpdatesSensor = null;
@SuppressWarnings("unchecked")
@Override
@@ -86,12 +92,24 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
if (queryableName != null) {
- store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName);
+ final StateStore stateStore = context.getStateStore(queryableName);
+ try {
+ store = ((WrappedStateStore<MeteredTimestampedKeyValueStore<K, V>, K, V>) stateStore).wrapped();
+ } catch (final ClassCastException e) {
+ throw new IllegalStateException("Unexpected store type: " + stateStore.getClass() + " for store: " + queryableName, e);
+ }
tupleForwarder = new TimestampedTupleForwarder<>(
store,
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
+ skippedIdempotentUpdatesSensor = skippedIdempotentUpdatesSensor(
+ Thread.currentThread().getName(),
+ context.taskId().toString(),
+ ((InternalProcessorContext) context).currentNode().name(),
+ metrics
+ );
+
}
}
@@ -108,7 +126,8 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
}
if (queryableName != null) {
- final ValueAndTimestamp<V> oldValueAndTimestamp = store.get(key);
+ final RawAndDeserializedValue<V> tuple = store.getWithBinary(key);
+ final ValueAndTimestamp<V> oldValueAndTimestamp = tuple.value;
final V oldValue;
if (oldValueAndTimestamp != null) {
oldValue = oldValueAndTimestamp.value();
@@ -119,8 +138,14 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
} else {
oldValue = null;
}
- store.put(key, ValueAndTimestamp.make(value, context().timestamp()));
- tupleForwarder.maybeForward(key, value, oldValue);
+ final ValueAndTimestamp<V> newValueAndTimestamp = ValueAndTimestamp.make(value, context().timestamp());
+ final boolean isDifferentValue =
+ store.putIfDifferentValues(key, newValueAndTimestamp, tuple.serializedValue);
+ if (isDifferentValue) {
+ tupleForwarder.maybeForward(key, value, oldValue);
+ } else {
+ skippedIdempotentUpdatesSensor.record();
+ }
} else {
context().forward(key, new Change<>(value, null));
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
index b495f7f..c157d83 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
@@ -47,6 +47,12 @@ public class ProcessorNodeMetrics {
private static final String SUPPRESSION_EMIT_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + SUPPRESSION_EMIT_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
+ private static final String IDEMPOTENT_UPDATE_SKIP = "idempotent-update-skip";
+ private static final String IDEMPOTENT_UPDATE_SKIP_DESCRIPTION = "skipped idempotent updates";
+ private static final String IDEMPOTENT_UPDATE_SKIP_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + IDEMPOTENT_UPDATE_SKIP_DESCRIPTION;
+ private static final String IDEMPOTENT_UPDATE_SKIP_RATE_DESCRIPTION =
+ RATE_DESCRIPTION_PREFIX + IDEMPOTENT_UPDATE_SKIP_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
+
private static final String PROCESS = "process";
private static final String PROCESS_DESCRIPTION = "calls to process";
private static final String PROCESS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + PROCESS_DESCRIPTION;
@@ -108,6 +114,22 @@ public class ProcessorNodeMetrics {
);
}
+ public static Sensor skippedIdempotentUpdatesSensor(final String threadId,
+ final String taskId,
+ final String processorNodeId,
+ final StreamsMetricsImpl streamsMetrics) {
+ return throughputSensor(
+ threadId,
+ taskId,
+ processorNodeId,
+ IDEMPOTENT_UPDATE_SKIP,
+ IDEMPOTENT_UPDATE_SKIP_RATE_DESCRIPTION,
+ IDEMPOTENT_UPDATE_SKIP_TOTAL_DESCRIPTION,
+ RecordingLevel.DEBUG,
+ streamsMetrics
+ );
+ }
+
public static Sensor processSensor(final String threadId,
final String taskId,
final String processorNodeId,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 6076702..c844e03 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -54,9 +54,9 @@ public class MeteredKeyValueStore<K, V>
private final String metricsScope;
protected final Time time;
- private Sensor putSensor;
+ protected Sensor putSensor;
private Sensor putIfAbsentSensor;
- private Sensor getSensor;
+ protected Sensor getSensor;
private Sensor deleteSensor;
private Sensor putAllSensor;
private Sensor allSensor;
@@ -206,11 +206,11 @@ public class MeteredKeyValueStore<K, V>
}
}
- private V outerValue(final byte[] value) {
+ protected V outerValue(final byte[] value) {
return value != null ? serdes.valueFrom(value) : null;
}
- private Bytes keyBytes(final K key) {
+ protected Bytes keyBytes(final K key) {
return Bytes.wrap(serdes.rawKey(key));
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index 468b554..d1446dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -16,9 +16,12 @@
*/
package org.apache.kafka.streams.state.internals;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -35,7 +38,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
* @param <V>
*/
public class MeteredTimestampedKeyValueStore<K, V>
- extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+ extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
implements TimestampedKeyValueStore<K, V> {
MeteredTimestampedKeyValueStore(final KeyValueStore<Bytes, byte[]> inner,
@@ -53,4 +56,48 @@ public class MeteredTimestampedKeyValueStore<K, V>
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
}
-}
\ No newline at end of file
+
+ public RawAndDeserializedValue<V> getWithBinary(final K key) {
+ try {
+ return maybeMeasureLatency(() -> {
+ final byte[] serializedValue = wrapped().get(keyBytes(key));
+ return new RawAndDeserializedValue<V>(serializedValue, outerValue(serializedValue));
+ }, time, getSensor);
+ } catch (final ProcessorStateException e) {
+ final String message = String.format(e.getMessage(), key);
+ throw new ProcessorStateException(message, e);
+ }
+ }
+
+ public boolean putIfDifferentValues(final K key,
+ final ValueAndTimestamp<V> newValue,
+ final byte[] oldSerializedValue) {
+ try {
+ return maybeMeasureLatency(
+ () -> {
+ final byte[] newSerializedValue = serdes.rawValue(newValue);
+ if (ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue, newSerializedValue)) {
+ return false;
+ } else {
+ wrapped().put(keyBytes(key), newSerializedValue);
+ return true;
+ }
+ },
+ time,
+ putSensor
+ );
+ } catch (final ProcessorStateException e) {
+ final String message = String.format(e.getMessage(), key, newValue);
+ throw new ProcessorStateException(message, e);
+ }
+ }
+
+ public static class RawAndDeserializedValue<ValueType> {
+ public final byte[] serializedValue;
+ public final ValueAndTimestamp<ValueType> value;
+ public RawAndDeserializedValue(final byte[] serializedValue, final ValueAndTimestamp<ValueType> value) {
+ this.serializedValue = serializedValue;
+ this.value = value;
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
index 614a0f5..3b2663d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
@@ -34,6 +34,51 @@ public class ValueAndTimestampSerializer<V> implements Serializer<ValueAndTimest
timestampSerializer = new LongSerializer();
}
+ private static boolean skipTimestampAndCompareValues(final byte[] left, final byte[] right) {
+ for (int i = Long.BYTES; i < left.length; i++) {
+ if (left[i] != right[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static long extractTimestamp(final byte[] bytes) {
+ final byte[] timestampBytes = new byte[Long.BYTES];
+ for (int i = 0; i < Long.BYTES; i++) {
+ timestampBytes[i] = bytes[i];
+ }
+ return ByteBuffer.wrap(timestampBytes).getLong();
+ }
+
+ /**
+ * @param left the serialized byte array of the old record in state store
+ * @param right the serialized byte array of the new record being processed
+ * @return true if the two serialized values are the same (excluding timestamp) or
+ * if the timestamp of right is less than left (indicating out of order record)
+ * false otherwise
+ */
+ public static boolean compareValuesAndCheckForIncreasingTimestamp(final byte[] left, final byte[] right) {
+ if (left == right) {
+ return true;
+ }
+ if (left == null || right == null) {
+ return false;
+ }
+
+ final int length = left.length;
+ if (right.length != length) {
+ return false;
+ }
+
+ final long leftTimestamp = extractTimestamp(left);
+ final long rightTimestamp = extractTimestamp(right);
+ if (rightTimestamp < leftTimestamp) {
+ return false;
+ }
+ return skipTimestampAndCompareValues(left, right);
+ }
+
@Override
public void configure(final Map<String, ?> configs,
final boolean isKey) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 34cf1c1..5ebd9ef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -49,6 +49,8 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
+import static java.util.Collections.singletonMap;
+
import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -386,12 +388,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
// Deleting a non-joining record produces an unnecessary tombstone for inner joins, because
// it's not possible to know whether a result was previously emitted.
+ // HOWEVER, when the final join result is materialized (either explicitly or
+ // implicitly by a subsequent join), we _can_ detect that the tombstone is unnecessary and drop it.
// For the left join, the tombstone is necessary.
left.pipeInput("lhs1", (String) null);
{
assertThat(
outputTopic.readKeyValuesToMap(),
- is(mkMap(mkEntry("lhs1", null)))
+ is(leftJoin || !(materialized || rejoin)
+ ? mkMap(mkEntry("lhs1", null))
+ : emptyMap())
);
if (materialized) {
assertThat(
@@ -468,11 +474,15 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
// "moving" our subscription to another non-existent FK results in an unnecessary tombstone for inner join,
// since it impossible to know whether the prior FK existed or not (and thus whether any results have
// previously been emitted)
+ // previously been emitted). HOWEVER, when the final join result is materialized (either explicitly or
+ // implicitly by a subsequent join), we _can_ detect that the tombstone is unnecessary and drop it.
// The left join emits a _necessary_ update (since the lhs record has actually changed)
left.pipeInput("lhs1", "lhsValue1|rhs2");
assertThat(
outputTopic.readKeyValuesToMap(),
- is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null)))
+ is(leftJoin
+ ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)"))
+ : (materialized || rejoin) ? emptyMap() : singletonMap("lhs1", null))
);
if (materialized) {
assertThat(
@@ -484,7 +494,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
left.pipeInput("lhs1", "lhsValue1|rhs3");
assertThat(
outputTopic.readKeyValuesToMap(),
- is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null)))
+ is(leftJoin
+ ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)"))
+ : (materialized || rejoin) ? emptyMap() : singletonMap("lhs1", null))
);
if (materialized) {
assertThat(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index 1cb154a..0a67f41 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -107,6 +107,7 @@ public class KTableSourceTopicRestartIntegrationTest {
@After
public void after() throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
+ CLUSTER.deleteAllTopicsAndWait(60000L);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 9cb0b85..160313a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -127,7 +127,7 @@ public class RestoreIntegrationTest {
// restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions
final int offsetLimitDelta = 1000;
final int offsetCheckpointed = 1000;
- createStateForRestoration(INPUT_STREAM);
+ createStateForRestoration(INPUT_STREAM, 0);
setCommittedOffset(INPUT_STREAM, offsetLimitDelta);
final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime(), true);
@@ -143,7 +143,7 @@ public class RestoreIntegrationTest {
builder.table(INPUT_STREAM, Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("store").withKeySerde(Serdes.Integer()).withValueSerde(Serdes.Integer()))
.toStream()
.foreach((key, value) -> {
- if (numReceived.incrementAndGet() == 2 * offsetLimitDelta) {
+ if (numReceived.incrementAndGet() == offsetLimitDelta * 2) {
shutdownLatch.countDown();
}
});
@@ -190,8 +190,8 @@ public class RestoreIntegrationTest {
// restoring from 1000 to 5000, and then process from 5000 to 10000 on each of the two partitions
final int offsetCheckpointed = 1000;
- createStateForRestoration(APPID + "-store-changelog");
- createStateForRestoration(INPUT_STREAM);
+ createStateForRestoration(APPID + "-store-changelog", 0);
+ createStateForRestoration(INPUT_STREAM, 10000);
final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime(), true);
// note here the checkpointed offset is the last processed record's offset, so without control message we should write this offset - 1
@@ -345,7 +345,7 @@ public class RestoreIntegrationTest {
public void close() { }
}
- private void createStateForRestoration(final String changelogTopic) {
+ private void createStateForRestoration(final String changelogTopic, final int startingOffset) {
final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -353,7 +353,8 @@ public class RestoreIntegrationTest {
new KafkaProducer<>(producerConfig, new IntegerSerializer(), new IntegerSerializer())) {
for (int i = 0; i < numberOfKeys; i++) {
- producer.send(new ProducerRecord<>(changelogTopic, i, i));
+ final int offset = startingOffset + i;
+ producer.send(new ProducerRecord<>(changelogTopic, offset, offset));
}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 236ea58..ae512b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
@@ -33,6 +35,9 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.streams.TestOutputTopic;
+
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
@@ -87,6 +92,44 @@ public class KTableSourceTest {
}
@Test
+ public void testKTableSourceEmitOnChange() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic1 = "topic1";
+
+ builder.table(topic1, Consumed.with(Serdes.String(), Serdes.Integer()), Materialized.as("store"))
+ .toStream()
+ .to("output");
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, Integer> inputTopic =
+ driver.createInputTopic(topic1, new StringSerializer(), new IntegerSerializer());
+ final TestOutputTopic<String, Integer> outputTopic =
+ driver.createOutputTopic("output", new StringDeserializer(), new IntegerDeserializer());
+
+ inputTopic.pipeInput("A", 1, 10L);
+ inputTopic.pipeInput("B", 2, 11L);
+ inputTopic.pipeInput("A", 1, 12L);
+ inputTopic.pipeInput("B", 3, 13L);
+ // this record should be kept since this is out of order, so the timestamp
+ // should be updated in this scenario
+ inputTopic.pipeInput("A", 1, 9L);
+
+ assertEquals(
+ 1.0,
+ getMetricByName(driver.metrics(), "idempotent-update-skip-total", "stream-processor-node-metrics").metricValue()
+ );
+
+ assertEquals(
+ asList(new TestRecord<>("A", 1, Instant.ofEpochMilli(10L)),
+ new TestRecord<>("B", 2, Instant.ofEpochMilli(11L)),
+ new TestRecord<>("B", 3, Instant.ofEpochMilli(13L)),
+ new TestRecord<>("A", 1, Instant.ofEpochMilli(9L))),
+ outputTopic.readRecordsToList()
+ );
+ }
+ }
+
+ @Test
public void kTableShouldLogAndMeterOnSkippedRecordsWithBuiltInMetrics0100To24() {
kTableShouldLogAndMeterOnSkippedRecords(StreamsConfig.METRICS_0100_TO_24);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 80cdae4..a48dcdb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -398,8 +398,8 @@ public class KTableTransformValuesTest {
driver.createInputTopic(INPUT_TOPIC, new StringSerializer(), new StringSerializer());
inputTopic.pipeInput("A", "ignored", 5L);
- inputTopic.pipeInput("A", "ignored", 15L);
- inputTopic.pipeInput("A", "ignored", 10L);
+ inputTopic.pipeInput("A", "ignored1", 15L);
+ inputTopic.pipeInput("A", "ignored2", 10L);
assertThat(output(), hasItems(new KeyValueTimestamp<>("A", "1", 5),
new KeyValueTimestamp<>("A", "0", 15),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 55f8670..29eb539 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -138,19 +138,19 @@ public class SuppressScenarioTest {
new KeyValueTimestamp<>("x", 1L, 3L)
)
);
- inputTopic.pipeInput("x", "x", 4L);
+ inputTopic.pipeInput("x", "y", 4L);
verify(
drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
new KeyValueTimestamp<>("x", 0L, 4L),
- new KeyValueTimestamp<>("x", 1L, 4L)
+ new KeyValueTimestamp<>("y", 1L, 4L)
)
);
verify(
drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
new KeyValueTimestamp<>("x", 0L, 4L),
- new KeyValueTimestamp<>("x", 1L, 4L)
+ new KeyValueTimestamp<>("y", 1L, 4L)
)
);
}
@@ -209,12 +209,12 @@ public class SuppressScenarioTest {
);
- inputTopic.pipeInput("tick", "tick", 4L);
+ inputTopic.pipeInput("tick", "tock", 4L);
verify(
drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
new KeyValueTimestamp<>("tick", 0L, 4L),
- new KeyValueTimestamp<>("tick", 1L, 4L)
+ new KeyValueTimestamp<>("tock", 1L, 4L)
)
);
// tick is still buffered, since it was first inserted at time 3, and it is only time 4 right now.
@@ -614,11 +614,11 @@ public class SuppressScenarioTest {
);
- inputTopicRight.pipeInput("tick", "tick", 21L);
+ inputTopicRight.pipeInput("tick", "tick1", 21L);
verify(
drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
asList(
- new KeyValueTimestamp<>("tick", "(null,tick)", 21), // just a testing artifact
+ new KeyValueTimestamp<>("tick", "(null,tick1)", 21), // just a testing artifact
new KeyValueTimestamp<>("A", "(b,2)", 13L)
)
);
@@ -644,7 +644,6 @@ public class SuppressScenarioTest {
.to("output", Produced.with(Serdes.String(), Serdes.String()));
final Topology topology = builder.build();
- System.out.println(topology.describe());
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
final TestInputTopic<String, String> inputTopicRight =
driver.createInputTopic("right", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -703,11 +702,11 @@ public class SuppressScenarioTest {
);
- inputTopicLeft.pipeInput("tick", "tick", 21L);
+ inputTopicLeft.pipeInput("tick", "tick1", 21L);
verify(
drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
asList(
- new KeyValueTimestamp<>("tick", "(tick,null)", 21), // just a testing artifact
+ new KeyValueTimestamp<>("tick", "(tick1,null)", 21), // just a testing artifact
new KeyValueTimestamp<>("A", "(2,b)", 13L)
)
);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
index 9778db8..8563167 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
@@ -98,6 +98,28 @@ public class ProcessorNodeMetricsTest {
}
@Test
+ public void shouldGetIdempotentUpdateSkipSensor() {
+ final String metricNamePrefix = "idempotent-update-skip";
+ final String descriptionOfCount = "The total number of skipped idempotent updates";
+ final String descriptionOfRate = "The average number of skipped idempotent updates per second";
+ expect(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, metricNamePrefix, RecordingLevel.DEBUG))
+ .andReturn(expectedSensor);
+ expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(tagMap);
+ StreamsMetricsImpl.addInvocationRateAndCountToSensor(
+ expectedSensor,
+ StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP,
+ tagMap,
+ metricNamePrefix,
+ descriptionOfRate,
+ descriptionOfCount
+ );
+
+ verifySensor(
+ () -> ProcessorNodeMetrics.skippedIdempotentUpdatesSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, streamsMetrics)
+ );
+ }
+
+ @Test
public void shouldGetProcessSensor() {
final String metricNamePrefix = "process";
final String descriptionOfCount = "The total number of calls to process";
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 0f1795c..b32c331 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.RawAndDeserializedValue;
import org.apache.kafka.test.KeyValueIteratorStub;
import org.easymock.EasyMockRule;
import org.easymock.Mock;
@@ -65,6 +66,7 @@ import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -182,6 +184,52 @@ public class MeteredTimestampedKeyValueStoreTest {
}
@Test
+ public void shouldGetWithBinary() {
+ expect(inner.get(keyBytes)).andReturn(valueAndTimestampBytes);
+
+ inner.put(eq(keyBytes), aryEq(valueAndTimestampBytes));
+ expectLastCall();
+ init();
+
+ final RawAndDeserializedValue<String> valueWithBinary = metered.getWithBinary(key);
+ assertEquals(valueWithBinary.value, valueAndTimestamp);
+ assertEquals(valueWithBinary.serializedValue, valueAndTimestampBytes);
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void shouldNotPutIfSameValuesAndGreaterTimestamp() {
+ init();
+
+ metered.put(key, valueAndTimestamp);
+ final ValueAndTimestampSerde<String> stringSerde = new ValueAndTimestampSerde<>(Serdes.String());
+ final byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", valueAndTimestamp);
+
+ final ValueAndTimestamp<String> newValueAndTimestamp = ValueAndTimestamp.make("value", 98L);
+ assertFalse(metered.putIfDifferentValues(key,
+ newValueAndTimestamp,
+ encodedOldValue));
+ verify(inner);
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void shouldPutIfOutOfOrder() {
+ inner.put(eq(keyBytes), aryEq(valueAndTimestampBytes));
+ expectLastCall();
+ init();
+
+ metered.put(key, valueAndTimestamp);
+
+ final ValueAndTimestampSerde<String> stringSerde = new ValueAndTimestampSerde<>(Serdes.String());
+ final byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", valueAndTimestamp);
+
+ final ValueAndTimestamp<String> outOfOrderValueAndTimestamp = ValueAndTimestamp.make("value", 95L);
+ assertTrue(metered.putIfDifferentValues(key, outOfOrderValueAndTimestamp, encodedOldValue));
+ verify(inner);
+ }
+
+ @Test
public void shouldGetBytesFromInnerStoreAndReturnGetMetric() {
expect(inner.get(keyBytes)).andReturn(valueAndTimestampBytes);
init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java
index 63a5661..c22221e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java
@@ -25,6 +25,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class ValueAndTimestampSerializerTest {
private static final String TOPIC = "some-topic";
@@ -51,6 +53,28 @@ public class ValueAndTimestampSerializerTest {
}
@Test
+ public void shouldDropSerializedValueIfEqualWithGreaterTimestamp() {
+ final String value = "food";
+
+ final ValueAndTimestamp<String> oldValueAndTimestamp = ValueAndTimestamp.make(value, TIMESTAMP);
+ final byte[] oldSerializedValue = STRING_SERDE.serializer().serialize(TOPIC, oldValueAndTimestamp);
+ final ValueAndTimestamp<String> newValueAndTimestamp = ValueAndTimestamp.make(value, TIMESTAMP + 1);
+ final byte[] newSerializedValue = STRING_SERDE.serializer().serialize(TOPIC, newValueAndTimestamp);
+ assertTrue(ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue, newSerializedValue));
+ }
+
+ @Test
+ public void shouldKeepSerializedValueIfOutOfOrder() {
+ final String value = "balls";
+
+ final ValueAndTimestamp<String> oldValueAndTimestamp = ValueAndTimestamp.make(value, TIMESTAMP);
+ final byte[] oldSerializedValue = STRING_SERDE.serializer().serialize(TOPIC, oldValueAndTimestamp);
+ final ValueAndTimestamp<String> outOfOrderValueAndTimestamp = ValueAndTimestamp.make(value, TIMESTAMP - 1);
+ final byte[] outOfOrderSerializedValue = STRING_SERDE.serializer().serialize(TOPIC, outOfOrderValueAndTimestamp);
+ assertFalse(ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue, outOfOrderSerializedValue));
+ }
+
+ @Test
public void shouldSerializeNullDataAsNull() {
final byte[] serialized =
STRING_SERDE.serializer().serialize(TOPIC, ValueAndTimestamp.make(null, TIMESTAMP));