You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/04/03 23:52:44 UTC

[GitHub] [kafka] vcrfxia opened a new pull request, #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

vcrfxia opened a new pull request, #13496:
URL: https://github.com/apache/kafka/pull/13496

   (This PR is stacked on https://github.com/apache/kafka/pull/13442. Only the last commit needs to be reviewed separately.)
   
   In preparation for updating DSL processors to use versioned stores (cf [KIP-914](https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores)), this PR adds two new methods to KTableValueGetter: `isVersioned()` and `get(key, asOfTimestamp)` and updates all existing implementations accordingly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1160906740


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) {

Review Comment:
   > For example, is caching a functional change?
   
   I would say, yes, because it impacts what intermediate results one gets. But I agree that the answer is not "black or white", because -> `In either case, the guarantee is that the latest (by offset) record is the "correct" value for the key, and the others should be effectively ignored.`)
   
   > but if they somehow had access to the inner/left/outer join processor (before the join merge processor) and called get(key, ts) on the KTableValueGetter, what should be returned?
   
   For this case, they won't query the result table, but the input table, right (and only indirectly, cf my comments below)? -> ` I think it's natural to return the correct older join result because we have it available.` You say "older join result" -- don't think they would get a join result, would they?
   
   Overall, it seems to be a very unlikely scenario anyway. But I would like to call out the following: in the end, the input table is maintained by an upstream processor, and the "join processor" just has access to it. Thus, if a user would tab into the "join processor" to indirectly access the upstream table store, it seem ok to me if they would actually get a different result. If they really want to query the table, they should not tab into the "join processor" but into the upstream processor that maintains the table.
   
   So to me it seems, the value getter should always use `get(k)` and not `get(k, ts)` to re-compute the "latest by timestamp" result, because it is the same result that a materialized downstream table would store? Not sure what the cleanest way to implement this is, but given how the "join processor" works, it basically get a versioned input ktable, and produced a non-versioned ktable and drops out-of-order records. So if we would only expose a value-getter that does not support `get(k, ts)` it would be reasonable to me. -- Also, we are talking about implementation details, not public API -- the public API is two input KTables and one output KTable -- the fact that we have 5 processor is internal, and we can change it any time -- so if users tap into any of the 5 internal processor, they don't have any guarantee that it will be backward compatible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1163422597


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java:
##########
@@ -25,5 +25,21 @@
 
     ValueAndTimestamp<V> get(K key);
 
+    /**
+     * Returns the latest record version, associated with the provided key, with timestamp
+     * not exceeding the provided timestamp bound. This method may only be called if
+     * {@link #isVersioned()} is true.
+     */
+    default ValueAndTimestamp<V> get(K key, long asOfTimestamp) {
+        throw new UnsupportedOperationException("get(key, timestamp) is only supported for versioned stores");
+    }
+
+    /**
+     * @return whether this value getter supports multiple record versions for the same key.
+     *         If true, then {@link #get(Object, long)} must be implemented. If not, then
+     *         {@link #get(Object, long)} must not be called.
+     */
+    boolean isVersioned();

Review Comment:
   Heh, I made one of them non-default in order to force future authors of new value getters to have to consider at least one of them, with the hope that considering one implies considering the other as well :) I will leave it as-is for now, can always be changed in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1163422113


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) {

Review Comment:
   Sounds like your instinct is still that it's better to formalize our understanding that the output of a table-table join today is not versioned, by having the value getter reflect that. I trust your instinct here and am onboard -- just made the changes. 
   
   A benefit of this is that it is now easier to explain to users that any stateful transformation (aggregate, join, and also suppress -- if suppress should be considered stateful in the same sense) will cause a versioned table to no longer be versioned (unless the result is explicitly materialized with a versioned store, which should not be done for joins anyway). 
   
   The cost is that stream-(table-table) join does not get versioned semantics, but I think that's acceptable since (stream-table)-table does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1163195888


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) {

Review Comment:
   > The first option is nice in that now stream-(table-table) and (stream-table)-table joins with no intermediate materialization produce the same results,
   
   But are both joins really the same if the intermediate table-table result is not materialized? Semantically, the intermediate table-table result is a non-versioned store, and thus we cannot do a lookup into the history of it, ie, we have a stream-tsTable join. The second query is two `stream-vTable` joins so it seems ok if they produce different results?
   
   > but it's also confusing because stream-(table-table) produces different results if the user materializes the result of the table-table join as a versioned store (which is wrong).
   
   I don't see it as confusing (it might be very subtle to be fair...) -- the intermediate result of a non-materialized t-t-join is semantically a tsTable (or course, it does not get out-of-order updates, because the _join_ that computes it has two versioned tables as input and thus drop out-of-order updates) -- if the intermediate result is materialized as tsKV-store, semantics should not change. If one materialized it as vKV store though, it seem ok that semantics change, because the semantics of the intermediate result change from being non-versioned to versioned, and thus the join changed from `stream-tsTable` to `stream-vTable`.
   
   My point is, that for a table-table join, there are 4 entities: both input tables, the join operator, plus the result table. The two input table (v-table vs ts-table) determine what join operator we pick (ie, drop out-of-order updates yes/no), and the join produces an result that we know feed into the result table with it's own semantics (by default, ts-semantics, not v-semantics) -- Of course, depending on the used join semantics, we apply different updates to the table, but we don't change the table semantics itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax merged PR #13496:
URL: https://github.com/apache/kafka/pull/13496


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1163228842


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java:
##########
@@ -159,11 +160,36 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) {
+            return computeJoin(
+                key,
+                k -> valueGetter1.get(k, asOfTimestamp),
+                k -> valueGetter2.get(k, asOfTimestamp));

Review Comment:
   As above -- can we get rid of the indirection by calling `get(k, asOfTimestamp)` and pass in the result into `computeJoin()`?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java:
##########
@@ -154,9 +155,34 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) {
+            return computeJoin(
+                key,
+                k -> valueGetter1.get(k, asOfTimestamp),

Review Comment:
   As above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1163224319


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);

Review Comment:
   Just wondering: why do we pass in the `get()` method, but not execute `get()` directly and pass in the result?
   
   Same for `get(final K key, final long asOfTimestamp)` -- Looking into the code, I think we can remove the used `keyValueMapper` that only return the key anyway as a side cleanup?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1163228842


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java:
##########
@@ -159,11 +160,36 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) {
+            return computeJoin(
+                key,
+                k -> valueGetter1.get(k, asOfTimestamp),
+                k -> valueGetter2.get(k, asOfTimestamp));

Review Comment:
   As above -- can we get rid of the indirection by calling `get(k, asOfTimestamp)` and pass in the result into `computeJoin()`? (Also in other places below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1158943426


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java:
##########
@@ -25,5 +25,21 @@
 
     ValueAndTimestamp<V> get(K key);
 
+    /**
+     * Returns the latest record version, associated with the provided key, with timestamp
+     * not exceeding the provided timestamp bound. This method may only be called if
+     * {@link #isVersioned()} is true.
+     */
+    default ValueAndTimestamp<V> get(K key, long asOfTimestamp) {
+        throw new UnsupportedOperationException("get(key, timestamp) is only supported for versioned stores");
+    }
+
+    /**
+     * @return whether this value getter supports multiple record versions for the same key.
+     *         If true, then {@link #get(Object, long)} must be implemented. If not, then
+     *         {@link #get(Object, long)} must not be called.
+     */
+    boolean isVersioned();

Review Comment:
   Sure, I was on the fence about this. The benefit of not having a default value is that it forces all new implementations to think about whether versioning can/should be supported, but I'm not too worried since it's an internal interface. If you prefer having the default, I will make the update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1158814912


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) {

Review Comment:
   Wondering if this is correct? This "ValueGetter" queries the (non-materialized) result table of in inner-join, and re-computes the result on-the-fly? Maybe it will get clear in a follow up PR, but would love to get a short / high-level explanation why it's the right thing to do (as I right now don't understand it). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1162213072


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) {

Review Comment:
   > You say "older join result" -- don't think they would get a join result, would they?
   
   They get what should've been a join result, if the join were to emit a complete history of older join results (which it does not due to computational expediency). Here's a concrete example to check we're on the same page. Suppose we have an inner join, and all records are for the same key:
   ```
   A: (a5, ts=5)
   B: (b1, ts=1) --> triggers join result (a5, b1) with ts=5
   A: (a2, ts=2) --> no new join result, because this record is out-of-order
   ```
   If the result is not materialized and someone calls `get(k, 2)` on the value getter, then the value getter will join `a2` and `b1` on the fly and return `(a2, b1)` even though this was never emitted downstream. 
   
   I gave this some more thought and I think the behavior could be desirable, even though I agree with your statement:
   > given how the "join processor" works, it basically get a versioned input ktable, and produced a non-versioned ktable and drops out-of-order records. So if we would only expose a value-getter that does not support get(k, ts) it would be reasonable to me.
   
   I think there actually is a situation in which `get(k, ts)` would be called on this join value getter today. If the table-table join result is not materialized, and is directly joined to a stream, then if the table-table join result is identified as "versioned" then the stream-table join will call `get(k, ts)` on the value getter. This situation is really interesting because it would be wrong for the user to explicitly materialize the result of the table-table join with a versioned store, and then join it to the stream, but if they do not explicitly materialize the result and instead perform the join directly, then they can get proper stream-table join semantics using the value getter.
   
   Assuming my understanding is correct, then we have two options:
   1. Say that the result of a table-table join (where both input tables are versioned) where the result is not explicitly materialized is versioned, and have the value getter support versioning, as in the current PR. Then the stream-(table-table) join uses fully versioned semantics and returns correct results.
   2. Say the the result of a table-table join (where both input tables are versioned) where the result is not explicitly materialized is not versioned, and update the value getter to reflect this. Then the stream-(table-table) join does not use versioned semantics. Users need to perform a (stream-table)-table join to get versioned semantics instead.
   
   The first option is nice in that now `stream-(table-table)` and `(stream-table)-table` joins with no intermediate materialization produce the same results, but it's also confusing because `stream-(table-table)` produces different results if the user materializes the result of the table-table join as a versioned store (which is wrong). 
   
   WDYT? I'm happy with either approach now that I feel like we've discussed all angles fully. We do need to make a decision in this PR though, if my understanding about `get(k, ts)` being called from downstream stream-table joins if it's supported is correct else there will be compatibility implications for changing it in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1163422597


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java:
##########
@@ -25,5 +25,21 @@
 
     ValueAndTimestamp<V> get(K key);
 
+    /**
+     * Returns the latest record version, associated with the provided key, with timestamp
+     * not exceeding the provided timestamp bound. This method may only be called if
+     * {@link #isVersioned()} is true.
+     */
+    default ValueAndTimestamp<V> get(K key, long asOfTimestamp) {
+        throw new UnsupportedOperationException("get(key, timestamp) is only supported for versioned stores");
+    }
+
+    /**
+     * @return whether this value getter supports multiple record versions for the same key.
+     *         If true, then {@link #get(Object, long)} must be implemented. If not, then
+     *         {@link #get(Object, long)} must not be called.
+     */
+    boolean isVersioned();

Review Comment:
   Heh, I made one of non-default in order to force future authors of new value getters to have to consider at least one of them, with the hope that considering one implies considering the other as well :) I will leave it as-is for now, can always be changed in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1160433833


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java:
##########
@@ -25,5 +25,21 @@
 
     ValueAndTimestamp<V> get(K key);
 
+    /**
+     * Returns the latest record version, associated with the provided key, with timestamp
+     * not exceeding the provided timestamp bound. This method may only be called if
+     * {@link #isVersioned()} is true.
+     */
+    default ValueAndTimestamp<V> get(K key, long asOfTimestamp) {
+        throw new UnsupportedOperationException("get(key, timestamp) is only supported for versioned stores");
+    }
+
+    /**
+     * @return whether this value getter supports multiple record versions for the same key.
+     *         If true, then {@link #get(Object, long)} must be implemented. If not, then
+     *         {@link #get(Object, long)} must not be called.
+     */
+    boolean isVersioned();

Review Comment:
   The same argument seems to apply to `get()` and it does have a default? Seems inconsistent to me? But I agree, it's internal so also don't feel strong about it. Leave it to your judgement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1163195888


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) {

Review Comment:
   > The first option is nice in that now stream-(table-table) and (stream-table)-table joins with no intermediate materialization produce the same results,
   
   But are both joins really the same if the intermediate table-table result is not materialized? Semantically, the intermediate table-table result is a non-versioned store, and thus we cannot do a lookup into the history of it, ie, we have a stream-tsTable join. The second query is two `stream-vTable` joins so it seems ok if they produce different results?
   
   > but it's also confusing because stream-(table-table) produces different results if the user materializes the result of the table-table join as a versioned store (which is wrong).
   
   I don't see it as confusing (it might be very subtle to be fair...) -- the intermediate result of a non-materialized t-t-join is semantically a tsTable (or course, it does not get out-of-order updates, because the _join_ that computes it has two versioned tables as input and thus drop out-of-order updates) -- if the intermediate result is materialized as tsKV-store, semantics should not change. If one materialized it as vKV store though, it seem ok that semantics change, because the semantics of the intermediate result change from being non-versioned to versioned, and thus the join changed from `stream-tsTable` to `stream-vTable`.
   
   My point is, that for a table-table join, there are 4 entities: both input tables, the join operator, plus the result table. The two input table (v-table vs ts-table) determine what join operator we pick (ie, drop out-of-order updates yes/no), and the join produces an result that we know feed into the result table with it's own semantics (by default, ts-semantics, not v-semantics) -- Of course, depending on the used join semantics, we apply different updates to the table, but we don't change the table semantics itself.
   
   I guess bottom line is: this PR should be fine, but we need to ensure to use the right version of `get` upstream?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1158792312


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java:
##########
@@ -25,5 +25,21 @@
 
     ValueAndTimestamp<V> get(K key);
 
+    /**
+     * Returns the latest record version, associated with the provided key, with timestamp
+     * not exceeding the provided timestamp bound. This method may only be called if
+     * {@link #isVersioned()} is true.
+     */
+    default ValueAndTimestamp<V> get(K key, long asOfTimestamp) {
+        throw new UnsupportedOperationException("get(key, timestamp) is only supported for versioned stores");
+    }
+
+    /**
+     * @return whether this value getter supports multiple record versions for the same key.
+     *         If true, then {@link #get(Object, long)} must be implemented. If not, then
+     *         {@link #get(Object, long)} must not be called.
+     */
+    boolean isVersioned();

Review Comment:
   As `get(key, asOfTs)` throws and exception by default, should we let `isVersioned` return `false` by default?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1158941945


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) {

Review Comment:
   > the value-getter should return the same thing as if the result state-store would have been materialized
   
   Why is this true? I agree that this is not happening for this particular value getter, but I don't understand why this is the property that we wish to preserve. 
   
   Wherever the KTable that is a result of a processor computation is materialized, then the value getter for that table uses the materialization directly. It's only when the result is not materialized where the value getter may need to compute results on the fly. When a join value getter needs to perform computation on the fly, this value getter can actually return correct results for a timestamped get (as long as both inputs are versioned), by performing timestamped lookups from both inputs and joining the result. 
   
   What you point out is that this join result may have never been emitted downstream, and that is true, but the two feel separate to me. In an "ideal" world I think we would emit all older join results downstream when joining versioned tables. (It's just not practical from a computation standpoint, and it's also not clear that users want this by default.) I guess I don't see why the fact that we did not emit the result downstream means we should not return it if asked to compute it on-the-fly. It seems like if we have the correct value available, then we should return it.
   
   It does lead to an interesting/confusing situation where if the result of a join between two versioned tables is materialized with a versioned store, then timestamped gets will not necessarily return correct results, even though correct results could be obtained by not materializing the join result and simply using the value getter instead. The alternative is to say that this value getter does not support timestamped gets, even though it does have a way to compute them (correctly), which feels odd. Curious to hear your thoughts on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1158814912


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) {

Review Comment:
   Wondering if this is correct? This "ValueGetter" queries the (non-materialized) result table of in inner-join, and re-computes the result on-the-fly? Maybe it will get clear in a follow up PR, but would love to get a short / high-level explanation why it's the right thing to do (as I right now don't understand it).
   
   What I am worried about it the case, that we don't update the result for out-of-order updates, but when we re-compute the result on the fly, we might re-compute an "updated" result, and thus don't return the same record as perviously emitted.
   
   You call this out in the second example of the "Proposal" section on the KIP:
   ```
   A: (timestamp = 0, value = a0)
   B: (timestamp = 2, value = b2) -> latest record on B side, emits (a0, b2) with timestamp=2
   A: (timestamp = 5, value = a5) -> latest record on A side, emits (a5, b2) with timestamp=5
   A: (timestamp = 1, value = a1) -> out-of-order record on A side, no new join result emitted. the older (a0, b2) join result is no longer correct.
   ```
   
   We don't emit anything for the last record, while the older join result becomes invalid (that's ok); but if we call `get(key,1)` later, would we get back the older (not updated) `(a2,b0)` (which would be correct to return from a consistency point of view because the value-getter should return the same thing as if the result state-store would have been materialized, and the value-getter is not in use), or would we get back a now updated result `(a1,b2)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1160438018


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) {

Review Comment:
   > I don't understand why this is the property that we wish to preserve.
   
   Because materialization vs non-materialization should be a non-functional (ie, non semantical) change, but only a perf/resource footprint change. It's like any other non-functional config change.
   
   We decided to not correct older (potentially) incorrect result if out-of-order data arrives. Thus, if we materialize the result as you point out in you last paragraph, we would always us a potentially non-correct result on lookup. Thus, we should do the same thing for the non-materialized case.
   
   What actually raises an interesting question, as discussed in person: if the result table is versioned, it's always materialized. And if its not-materialized, it would actually always be non-versioned (because the user has no way to tell us to make the result table versioned without forcing a materialization.) And thus, if we do any `get()` on the non-versioned result table, we would never do a `get(k,ts)` anyway, so maybe in practice there is no issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1160829732


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) {

Review Comment:
   > Because materialization vs non-materialization should be a non-functional (ie, non semantical) change, but only a perf/resource footprint change.
   
   Hm, maybe I'm not understanding what's considered a "functional" change and what isn't. For example, is caching a functional change? Because versioned stores aside for a moment, if a table processor is materialized with a non-versioned store with caching enabled, then the downstream will see a different set of results than if the table were not materialized / caching was not enabled. In either case, the guarantee is that the latest (by offset) record is the "correct" value for the key, and the others should be effectively ignored. 
   
   Now turning to versioned stores, the only guarantee today when joining two versioned tables is that the latest (by timestamp and offset) result is correct and the others should be ignored. (In the future, we could expand this guarantee by also emitting correct older join results, but we don't do this today.) But we still have to answer the question of what happens if a "user" tries to query for an older join result (via the KTableValueGetter) -- we don't guarantee that the result is correct, but they could still do this. If the result table is materialized (as a versioned store), then they may get incorrect results. If the result table is not materialized, then it cannot be versioned (as you point out) and so they technically can't do this (at least from the join merge node), but if they somehow had access to the inner/left/outer join processor (before the join merge processor) and called `get(key, ts)` on the KTableValueGetter, what should be returned? I think it's natural to ret
 urn the correct older join result because we have it available. If you prefer to make the case to say that we should simply disallow this, based on the fact that versioned joins do not guarantee correct older join results today, I think that's fine too but we'll have to reimplement this same logic in the future when we do support correct older join history. And having the logic here doesn't introduce any incorrectness (IMO) in the meantime, so it seems better to leave it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org