You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/02 14:45:34 UTC

[GitHub] [kafka] spena commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store

spena commented on a change in pull request #11252:
URL: https://github.com/apache/kafka/pull/11252#discussion_r701101588



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java
##########
@@ -48,15 +50,22 @@ public void configure(final Map<String, ?> configs, final boolean isKey) {
 
     @Override
     public KeyAndJoinSide<K> deserialize(final String topic, final byte[] data) {
-        final boolean bool = data[0] == 1;
+        final boolean bool = data[8] == 1;

Review comment:
       It should be good to add a constant for the `8` number, so it is easily read. for instance, the `rawKey()` has `new byte[data.length - 9]`, which I assume is `len - TIMESTAMP - BOOL`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideSerializer.java
##########
@@ -55,9 +57,11 @@ public void configure(final Map<String, ?> configs, final boolean isKey) {
     public byte[] serialize(final String topic, final KeyAndJoinSide<K> data) {
         final byte boolByte = (byte) (data.isLeftSide() ? 1 : 0);
         final byte[] keyBytes = keySerializer.serialize(topic, data.getKey());
+        final byte[] timestampBytes = timestampSerializer.serialize(topic, data.getTimestamp());
 
         return ByteBuffer
-            .allocate(keyBytes.length + 1)
+            .allocate(8 + keyBytes.length + 1)

Review comment:
       Should the `8` be a constant variable or just `timestampBytes.length`? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -63,6 +63,10 @@
     private InternalProcessorContext context;
     private TaskId taskId;
 
+    interface WindowedKeySerde {
+        Bytes serialize(final Bytes key, final long timestamp, final int seqnum);
+    }
+

Review comment:
       Is this used somewhere?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
##########
@@ -33,172 +39,112 @@
  * For key range queries, like fetch(key, fromTime, toTime), use the {@link RocksDBWindowStore}
  * which uses the {@link WindowKeySchema} to serialize the record bytes for efficient key queries.
  */
+@SuppressWarnings("unchecked")
 public class RocksDBTimeOrderedWindowStore

Review comment:
       Two things:
   - Seems `TimeOrderedKeySchema` is not needed anymore. Should the class be removed?
   - Should we rename the class to remove the `Window` par?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -146,7 +146,7 @@ public void process(final K key, final V1 value) {
 
                     outerJoinWindowStore.ifPresent(store -> {
                         // Delete the joined record from the non-joined outer window store
-                        store.put(KeyAndJoinSide.make(!isLeftSide, key), null, otherRecordTimestamp);
+                        store.put(KeyAndJoinSide.make(!isLeftSide, key, otherRecordTimestamp), null);

Review comment:
       Should you call `store.delete(KeyAndJoinSide.make(!isLeftSide, key, otherRecordTimestamp))` now?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -209,37 +208,36 @@ private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, Left
             // reset to MAX_VALUE in case the store is empty
             sharedTimeTracker.minTime = Long.MAX_VALUE;
 
-            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
+            try (final KeyValueIterator<KeyAndJoinSide<K>, LeftOrRightValue> it = store.all()) {
                 while (it.hasNext()) {
-                    final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> record = it.next();
+                    final KeyValue<KeyAndJoinSide<K>, LeftOrRightValue> record = it.next();
 
-                    final Windowed<KeyAndJoinSide<K>> windowedKey = record.key;
-                    final LeftOrRightValue value = record.value;
-                    sharedTimeTracker.minTime = windowedKey.window().start();
+                    final KeyAndJoinSide<K> keyAndJoinSide = record.key;
+                    final LeftOrRightValue<V1, V2> value = record.value;
+                    final K key = keyAndJoinSide.getKey();
+                    final long timestamp = keyAndJoinSide.getTimestamp();
+                    sharedTimeTracker.minTime = timestamp;
 
                     // Skip next records if window has not closed
-                    if (windowedKey.window().start() + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) {
+                    if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) {
                         break;
                     }
 
-                    final K key = windowedKey.key().getKey();
-                    final long time = windowedKey.window().start();
-
                     final R nullJoinedValue;
                     if (isLeftSide) {
                         nullJoinedValue = joiner.apply(key,
-                            (V1) value.getLeftValue(),
-                            (V2) value.getRightValue());
+                                value.getLeftValue(),
+                                value.getRightValue());
                     } else {
                         nullJoinedValue = joiner.apply(key,
-                            (V1) value.getRightValue(),
-                            (V2) value.getLeftValue());
+                                (V1) value.getRightValue(),
+                                (V2) value.getLeftValue());
                     }
 
-                    context().forward(key, nullJoinedValue, To.all().withTimestamp(time));
+                    context().forward(key, nullJoinedValue, To.all().withTimestamp(timestamp));
 
                     // Delete the key from the outer window store now it is emitted
-                    store.put(record.key.key(), null, record.key.window().start());
+                    store.put(keyAndJoinSide, null);

Review comment:
       Should you call `store.delete(keyAndJoinSide)` now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org