You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/04/14 15:30:21 UTC
[kafka] branch 3.5 updated: MINOR: Refactor changelogger to accept timestamp (#13563)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.5 by this push:
new 904650e3a15 MINOR: Refactor changelogger to accept timestamp (#13563)
904650e3a15 is described below
commit 904650e3a15614aa71dfe49ae75cee87b843ef86
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Apr 14 07:14:10 2023 -0700
MINOR: Refactor changelogger to accept timestamp (#13563)
Reviewers: Bill Bejeck <bi...@confluent.io>
---
.../internals/ChangeLoggingKeyValueBytesStore.java | 14 +++++-----
.../ChangeLoggingListValueBytesStore.java | 4 +--
...ChangeLoggingTimestampedKeyValueBytesStore.java | 32 ++++++++++++++++++----
.../ChangeLoggingTimestampedWindowBytesStore.java | 12 ++++----
.../ChangeLoggingVersionedKeyValueBytesStore.java | 8 ++----
...ValueToTimestampedKeyValueByteStoreAdapter.java | 6 ++--
.../internals/ValueAndTimestampDeserializer.java | 4 +++
.../WindowToTimestampedWindowByteStoreAdapter.java | 2 +-
8 files changed, 52 insertions(+), 30 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index e867b278f5b..379554aee07 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -62,7 +62,7 @@ public class ChangeLoggingKeyValueBytesStore
if (wrapped() instanceof MemoryLRUCache) {
((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
// pass null to indicate removal
- log(key, null);
+ log(key, null, context.timestamp());
});
}
}
@@ -76,7 +76,7 @@ public class ChangeLoggingKeyValueBytesStore
public void put(final Bytes key,
final byte[] value) {
wrapped().put(key, value);
- log(key, value);
+ log(key, value, context.timestamp());
}
@Override
@@ -85,7 +85,7 @@ public class ChangeLoggingKeyValueBytesStore
final byte[] previous = wrapped().putIfAbsent(key, value);
if (previous == null) {
// then it was absent
- log(key, value);
+ log(key, value, context.timestamp());
}
return previous;
}
@@ -94,7 +94,7 @@ public class ChangeLoggingKeyValueBytesStore
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
wrapped().putAll(entries);
for (final KeyValue<Bytes, byte[]> entry : entries) {
- log(entry.key, entry.value);
+ log(entry.key, entry.value, context.timestamp());
}
}
@@ -107,7 +107,7 @@ public class ChangeLoggingKeyValueBytesStore
@Override
public byte[] delete(final Bytes key) {
final byte[] oldValue = wrapped().delete(key);
- log(key, null);
+ log(key, null, context.timestamp());
return oldValue;
}
@@ -138,7 +138,7 @@ public class ChangeLoggingKeyValueBytesStore
return wrapped().reverseAll();
}
- void log(final Bytes key, final byte[] value) {
- context.logChange(name(), key, value, context.timestamp(), wrapped().getPosition());
+ void log(final Bytes key, final byte[] value, final long timestamp) {
+ context.logChange(name(), key, value, timestamp, wrapped().getPosition());
}
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
index c01594b222c..be5805c53fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
@@ -32,9 +32,9 @@ public class ChangeLoggingListValueBytesStore extends ChangeLoggingKeyValueBytes
// we need to log the full new list and thus call get() on the inner store below
// if the value is a tombstone, we delete the whole list and thus can save the get call
if (value == null) {
- log(key, null);
+ log(key, null, context.timestamp());
} else {
- log(key, wrapped().get(key));
+ log(key, wrapped().get(key), context.timestamp());
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
index 71a1d0a9e14..efb265ee44d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
@@ -17,8 +17,11 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueStore;
+import java.util.List;
+
import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue;
import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.timestamp;
@@ -29,12 +32,29 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey
}
@Override
- void log(final Bytes key,
- final byte[] valueAndTimestamp) {
- if (valueAndTimestamp != null) {
- context.logChange(name(), key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp), wrapped().getPosition());
- } else {
- context.logChange(name(), key, null, context.timestamp(), wrapped().getPosition());
+ public void put(final Bytes key,
+ final byte[] valueAndTimestamp) {
+ wrapped().put(key, valueAndTimestamp);
+ log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? context.timestamp() : timestamp(valueAndTimestamp));
+ }
+
+ @Override
+ public byte[] putIfAbsent(final Bytes key,
+ final byte[] valueAndTimestamp) {
+ final byte[] previous = wrapped().putIfAbsent(key, valueAndTimestamp);
+ if (previous == null) {
+ // then it was absent
+ log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? context.timestamp() : timestamp(valueAndTimestamp));
+ }
+ return previous;
+ }
+
+ @Override
+ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+ wrapped().putAll(entries);
+ for (final KeyValue<Bytes, byte[]> entry : entries) {
+ final byte[] valueAndTimestamp = entry.value;
+ log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? context.timestamp() : timestamp(valueAndTimestamp));
}
}
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
index 75c18bc3442..aea84170733 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
@@ -32,10 +32,12 @@ class ChangeLoggingTimestampedWindowBytesStore extends ChangeLoggingWindowBytesS
@Override
void log(final Bytes key,
final byte[] valueAndTimestamp) {
- if (valueAndTimestamp != null) {
- context.logChange(name(), key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp), wrapped().getPosition());
- } else {
- context.logChange(name(), key, null, context.timestamp(), wrapped().getPosition());
- }
+ context.logChange(
+ name(),
+ key,
+ rawValue(valueAndTimestamp),
+ valueAndTimestamp != null ? timestamp(valueAndTimestamp) : context.timestamp(),
+ wrapped().getPosition()
+ );
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java
index 030b1ce40c1..f9f3e79b54e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java
@@ -56,11 +56,7 @@ public class ChangeLoggingVersionedKeyValueBytesStore extends ChangeLoggingKeyVa
}
@Override
- void log(final Bytes key, final byte[] rawValueAndTimestamp) {
- throw new IllegalStateException("versioned changelogging layer should call log(key, value, timestamp) instead");
- }
-
- private void log(final Bytes key, final byte[] value, final long timestamp) {
+ public void log(final Bytes key, final byte[] value, final long timestamp) {
context.logChange(
name(),
key,
@@ -69,4 +65,4 @@ public class ChangeLoggingVersionedKeyValueBytesStore extends ChangeLoggingKeyVa
wrapped().getPosition()
);
}
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
index 3db8b7403a0..03506543c2b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
@@ -59,7 +59,7 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt
@Override
public void put(final Bytes key,
final byte[] valueWithTimestamp) {
- store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
+ store.put(key, rawValue(valueWithTimestamp));
}
@Override
@@ -67,14 +67,14 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt
final byte[] valueWithTimestamp) {
return convertToTimestampedFormat(store.putIfAbsent(
key,
- valueWithTimestamp == null ? null : rawValue(valueWithTimestamp)));
+ rawValue(valueWithTimestamp)));
}
@Override
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
for (final KeyValue<Bytes, byte[]> entry : entries) {
final byte[] valueWithTimestamp = entry.value;
- store.put(entry.key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
+ store.put(entry.key, rawValue(valueWithTimestamp));
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
index 935c0f06e32..5ea4225c4c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
@@ -66,6 +66,10 @@ class ValueAndTimestampDeserializer<V> implements WrappingNullableDeserializer<V
}
static byte[] rawValue(final byte[] rawValueAndTimestamp) {
+ if (rawValueAndTimestamp == null) {
+ return null;
+ }
+
final int rawValueLength = rawValueAndTimestamp.length - 8;
return ByteBuffer
.allocate(rawValueLength)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index 3c86c5aac88..733d4b99683 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -49,7 +49,7 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
public void put(final Bytes key,
final byte[] valueWithTimestamp,
final long windowStartTimestamp) {
- store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp), windowStartTimestamp);
+ store.put(key, rawValue(valueWithTimestamp), windowStartTimestamp);
}
@Override