You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/14 07:57:22 UTC

[GitHub] [kafka] rodesai commented on a change in pull request #8254: KIP-557: Add Emit On Change Support

rodesai commented on a change in pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#discussion_r424941145



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##########
@@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) {
             keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
             valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
     }
-}
\ No newline at end of file
+
+    public RawAndDeserializedValue<V> getWithBinary(final K key) {
+        try {
+            return maybeMeasureLatency(() -> { 
+                final byte[] serializedValue = wrapped().get(keyBytes(key));
+                return new RawAndDeserializedValue<V>(serializedValue, outerValue(serializedValue));
+            }, time, getSensor);
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
+        }
+    }
+
+    public boolean putIfDifferentValues(final K key,
+                                        final ValueAndTimestamp<V> newValue,
+                                        final byte[] oldSerializedValue) {
+        try {
+            return maybeMeasureLatency(
+                () -> {
+                    final byte[] newSerializedValue = serdes.rawValue(newValue);
+                    if (ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue, newSerializedValue)) {
+                        return false;
+                    } else {
+                        wrapped().put(keyBytes(key), newSerializedValue);

Review comment:
       @ConcurrencyPractitioner @vvcephei I'm trying to understand this to debug some broken tests in ksql. Couple questions:
   
   Why when the timestamp of the newer value is lower, do we want to put the new value into the store? Surely the store should have the value with the newer timestamp? Otherwise we could wind up with a corrupt store.
   
   Don't we still want to put the value in the store if the values are the same but the timestamp is newer? Otherwise if we get an out-of-order update with a different value, but a timestamp in between the rows with the same value, we'd incorrectly put that value into the store, e.g. the following updates:
   
   TS: 1, K: X, V: A
   TS: 3, K: X, V: A
   TS: 2, K: X, V: B
   
   would result in the table containing `K: X, V: B`, which is wrong.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org