You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vamossagar12 (via GitHub)" <gi...@apache.org> on 2023/05/18 07:11:10 UTC

[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13682: MINOR: improved exception/warn logging for stream-stream join store settings

vamossagar12 commented on code in PR #13682:
URL: https://github.com/apache/kafka/pull/13682#discussion_r1197482096


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java:
##########
@@ -242,13 +249,21 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs,
 
     private void assertWindowSettings(final WindowBytesStoreSupplier supplier, final JoinWindows joinWindows) {
         if (!supplier.retainDuplicates()) {
-            throw new StreamsException("The StoreSupplier must set retainDuplicates=true, found retainDuplicates=false");
+            throw new StreamsException(String.format(
+              "The StoreSupplier for join store %s must set retainDuplicates = true, found retainDuplicates = false",
+              supplier.name()));
+        }
+        if (supplier.retentionPeriod() != joinWindows.size() + joinWindows.gracePeriodMs()) {
+            throw new StreamsException(String.format(
+              "The StoreSupplier for join store %s must set retentionPeriod = windowSize + gracePeriod,"
+                + " found retentionPeriod = %d, joinWindows.size = %d, joinWindows.gracePeriod = %d",
+              supplier.name(), supplier.retentionPeriod(), joinWindows.size(), joinWindows.gracePeriodMs()));
         }
-        final boolean allMatch = supplier.retentionPeriod() == (joinWindows.size() + joinWindows.gracePeriodMs()) &&
-            supplier.windowSize() == joinWindows.size();
-        if (!allMatch) {
-            throw new StreamsException(String.format("Window settings mismatch. WindowBytesStoreSupplier settings %s must match JoinWindows settings %s" +
-                                                         " for the window size and retention period", supplier, joinWindows));
+        if (supplier.windowSize() != joinWindows.size()) {
+            throw new StreamsException(String.format(
+              "The StoreSupplier for join store %s must set supplier.windowSize = joinWindows.windowSize,"
+                + " found supplier.windowSize = %d, joinWindows.size = %d",
+              supplier.name(), supplier.windowSize(), joinWindows.size()));

Review Comment:
   @ableegoldman , I am thinking if the exception message can be rephrased to something like => 
   `Incorrect value {} used for config {} for StoreSupplier for join store. The correct value should be {}`. Do you think that sounds slightly more concise?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org