You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/04/14 15:30:21 UTC

[kafka] branch 3.5 updated: MINOR: Refactor changelogger to accept timestamp (#13563)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.5 by this push:
     new 904650e3a15 MINOR: Refactor changelogger to accept timestamp (#13563)
904650e3a15 is described below

commit 904650e3a15614aa71dfe49ae75cee87b843ef86
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Apr 14 07:14:10 2023 -0700

    MINOR: Refactor changelogger to accept timestamp (#13563)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>
---
 .../internals/ChangeLoggingKeyValueBytesStore.java | 14 +++++-----
 .../ChangeLoggingListValueBytesStore.java          |  4 +--
 ...ChangeLoggingTimestampedKeyValueBytesStore.java | 32 ++++++++++++++++++----
 .../ChangeLoggingTimestampedWindowBytesStore.java  | 12 ++++----
 .../ChangeLoggingVersionedKeyValueBytesStore.java  |  8 ++----
 ...ValueToTimestampedKeyValueByteStoreAdapter.java |  6 ++--
 .../internals/ValueAndTimestampDeserializer.java   |  4 +++
 .../WindowToTimestampedWindowByteStoreAdapter.java |  2 +-
 8 files changed, 52 insertions(+), 30 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index e867b278f5b..379554aee07 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -62,7 +62,7 @@ public class ChangeLoggingKeyValueBytesStore
         if (wrapped() instanceof MemoryLRUCache) {
             ((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
                 // pass null to indicate removal
-                log(key, null);
+                log(key, null, context.timestamp());
             });
         }
     }
@@ -76,7 +76,7 @@ public class ChangeLoggingKeyValueBytesStore
     public void put(final Bytes key,
                     final byte[] value) {
         wrapped().put(key, value);
-        log(key, value);
+        log(key, value, context.timestamp());
     }
 
     @Override
@@ -85,7 +85,7 @@ public class ChangeLoggingKeyValueBytesStore
         final byte[] previous = wrapped().putIfAbsent(key, value);
         if (previous == null) {
             // then it was absent
-            log(key, value);
+            log(key, value, context.timestamp());
         }
         return previous;
     }
@@ -94,7 +94,7 @@ public class ChangeLoggingKeyValueBytesStore
     public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         wrapped().putAll(entries);
         for (final KeyValue<Bytes, byte[]> entry : entries) {
-            log(entry.key, entry.value);
+            log(entry.key, entry.value, context.timestamp());
         }
     }
 
@@ -107,7 +107,7 @@ public class ChangeLoggingKeyValueBytesStore
     @Override
     public byte[] delete(final Bytes key) {
         final byte[] oldValue = wrapped().delete(key);
-        log(key, null);
+        log(key, null, context.timestamp());
         return oldValue;
     }
 
@@ -138,7 +138,7 @@ public class ChangeLoggingKeyValueBytesStore
         return wrapped().reverseAll();
     }
 
-    void log(final Bytes key, final byte[] value) {
-        context.logChange(name(), key, value, context.timestamp(), wrapped().getPosition());
+    void log(final Bytes key, final byte[] value, final long timestamp) {
+        context.logChange(name(), key, value, timestamp, wrapped().getPosition());
     }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
index c01594b222c..be5805c53fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
@@ -32,9 +32,9 @@ public class ChangeLoggingListValueBytesStore extends ChangeLoggingKeyValueBytes
         // we need to log the full new list and thus call get() on the inner store below
         // if the value is a tombstone, we delete the whole list and thus can save the get call
         if (value == null) {
-            log(key, null);
+            log(key, null, context.timestamp());
         } else {
-            log(key, wrapped().get(key));
+            log(key, wrapped().get(key), context.timestamp());
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
index 71a1d0a9e14..efb265ee44d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
@@ -17,8 +17,11 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.util.List;
+
 import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue;
 import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.timestamp;
 
@@ -29,12 +32,29 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey
     }
 
     @Override
-    void log(final Bytes key,
-             final byte[] valueAndTimestamp) {
-        if (valueAndTimestamp != null) {
-            context.logChange(name(), key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp), wrapped().getPosition());
-        } else {
-            context.logChange(name(), key, null, context.timestamp(), wrapped().getPosition());
+    public void put(final Bytes key,
+                    final byte[] valueAndTimestamp) {
+        wrapped().put(key, valueAndTimestamp);
+        log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? context.timestamp() : timestamp(valueAndTimestamp));
+    }
+
+    @Override
+    public byte[] putIfAbsent(final Bytes key,
+                              final byte[] valueAndTimestamp) {
+        final byte[] previous = wrapped().putIfAbsent(key, valueAndTimestamp);
+        if (previous == null) {
+            // then it was absent
+            log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? context.timestamp() : timestamp(valueAndTimestamp));
+        }
+        return previous;
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        wrapped().putAll(entries);
+        for (final KeyValue<Bytes, byte[]> entry : entries) {
+            final byte[] valueAndTimestamp = entry.value;
+            log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? context.timestamp() : timestamp(valueAndTimestamp));
         }
     }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
index 75c18bc3442..aea84170733 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
@@ -32,10 +32,12 @@ class ChangeLoggingTimestampedWindowBytesStore extends ChangeLoggingWindowBytesS
     @Override
     void log(final Bytes key,
              final byte[] valueAndTimestamp) {
-        if (valueAndTimestamp != null) {
-            context.logChange(name(), key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp), wrapped().getPosition());
-        } else {
-            context.logChange(name(), key, null, context.timestamp(), wrapped().getPosition());
-        }
+        context.logChange(
+            name(),
+            key,
+            rawValue(valueAndTimestamp),
+            valueAndTimestamp != null ? timestamp(valueAndTimestamp) : context.timestamp(),
+            wrapped().getPosition()
+        );
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java
index 030b1ce40c1..f9f3e79b54e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java
@@ -56,11 +56,7 @@ public class ChangeLoggingVersionedKeyValueBytesStore extends ChangeLoggingKeyVa
     }
 
     @Override
-    void log(final Bytes key, final byte[] rawValueAndTimestamp) {
-        throw new IllegalStateException("versioned changelogging layer should call log(key, value, timestamp) instead");
-    }
-
-    private void log(final Bytes key, final byte[] value, final long timestamp) {
+    public void log(final Bytes key, final byte[] value, final long timestamp) {
         context.logChange(
             name(),
             key,
@@ -69,4 +65,4 @@ public class ChangeLoggingVersionedKeyValueBytesStore extends ChangeLoggingKeyVa
             wrapped().getPosition()
         );
     }
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
index 3db8b7403a0..03506543c2b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
@@ -59,7 +59,7 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt
     @Override
     public void put(final Bytes key,
                     final byte[] valueWithTimestamp) {
-        store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
+        store.put(key, rawValue(valueWithTimestamp));
     }
 
     @Override
@@ -67,14 +67,14 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt
                               final byte[] valueWithTimestamp) {
         return convertToTimestampedFormat(store.putIfAbsent(
             key,
-            valueWithTimestamp == null ? null : rawValue(valueWithTimestamp)));
+            rawValue(valueWithTimestamp)));
     }
 
     @Override
     public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         for (final KeyValue<Bytes, byte[]> entry : entries) {
             final byte[] valueWithTimestamp = entry.value;
-            store.put(entry.key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
+            store.put(entry.key, rawValue(valueWithTimestamp));
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
index 935c0f06e32..5ea4225c4c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
@@ -66,6 +66,10 @@ class ValueAndTimestampDeserializer<V> implements WrappingNullableDeserializer<V
     }
 
     static byte[] rawValue(final byte[] rawValueAndTimestamp) {
+        if (rawValueAndTimestamp == null) {
+            return null;
+        }
+
         final int rawValueLength = rawValueAndTimestamp.length - 8;
         return ByteBuffer
             .allocate(rawValueLength)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index 3c86c5aac88..733d4b99683 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -49,7 +49,7 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
     public void put(final Bytes key,
                     final byte[] valueWithTimestamp,
                     final long windowStartTimestamp) {
-        store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp), windowStartTimestamp);
+        store.put(key, rawValue(valueWithTimestamp), windowStartTimestamp);
     }
 
     @Override