You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/13 01:11:55 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

ableegoldman opened a new pull request #9174:
URL: https://github.com/apache/kafka/pull/9174


   I went through all 5 stages of grief in thinking about what to do here and decided the best thing was to just relax the check after all. Hopefully users who find their output topic unexpectedly empty due to a typo in the topic name will be able to figure it out quickly from the warning we now log instead.
   
   Not sure how rampant the problem of output-topic-typos is to begin with...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9174:
URL: https://github.com/apache/kafka/pull/9174


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#discussion_r470374291



##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##########
@@ -805,10 +805,10 @@ public void advanceWallClockTime(final Duration advance) {
 
     private Queue<ProducerRecord<byte[], byte[]>> getRecordsQueue(final String topicName) {
         final Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topicName);
-        if (outputRecords == null) {
-            if (!processorTopology.sinkTopics().contains(topicName)) {
-                throw new IllegalArgumentException("Unknown topic: " + topicName);
-            }
+        if (outputRecords == null && !processorTopology.sinkTopics().contains(topicName)) {

Review comment:
       Yeah, that's a good point. Took a quick look at this and personally I feel it's not worth it to add even more things for the internal topology to track. But it wouldn't really add any complexity so if this is a big concern I can go ahead and add in a flag to watch out for any non-static TopicNameExtractors




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#issuecomment-679347362


   Might it be worth to cherry-pick to `2.6` ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#discussion_r469713175



##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##########
@@ -805,10 +805,10 @@ public void advanceWallClockTime(final Duration advance) {
 
     private Queue<ProducerRecord<byte[], byte[]>> getRecordsQueue(final String topicName) {
         final Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topicName);
-        if (outputRecords == null) {
-            if (!processorTopology.sinkTopics().contains(topicName)) {
-                throw new IllegalArgumentException("Unknown topic: " + topicName);
-            }
+        if (outputRecords == null && !processorTopology.sinkTopics().contains(topicName)) {

Review comment:
       Seems good enough as a bug fix, but I was wondering whether we could detect the dynamic topic is configured or not to make sure we are not actually allowing some other bugs to catch in TTD

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##########
@@ -805,10 +805,10 @@ public void advanceWallClockTime(final Duration advance) {
 
     private Queue<ProducerRecord<byte[], byte[]>> getRecordsQueue(final String topicName) {
         final Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topicName);
-        if (outputRecords == null) {
-            if (!processorTopology.sinkTopics().contains(topicName)) {
-                throw new IllegalArgumentException("Unknown topic: " + topicName);
-            }
+        if (outputRecords == null && !processorTopology.sinkTopics().contains(topicName)) {
+            log.warn("Unrecognized topic: {}, this can occur if dynamic routing is used and no output has been "
+                         + "sent to this topic yet. If not using a TopicNameExtractor, check that the output topic "
+                         + "is correct. ", topicName);

Review comment:
       nit: could avoid the last space after `is correct.`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#discussion_r472242890



##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##########
@@ -805,10 +805,10 @@ public void advanceWallClockTime(final Duration advance) {
 
     private Queue<ProducerRecord<byte[], byte[]>> getRecordsQueue(final String topicName) {
         final Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topicName);
-        if (outputRecords == null) {
-            if (!processorTopology.sinkTopics().contains(topicName)) {
-                throw new IllegalArgumentException("Unknown topic: " + topicName);
-            }
+        if (outputRecords == null && !processorTopology.sinkTopics().contains(topicName)) {

Review comment:
       Ok, let's just keep it in our back pocket for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#issuecomment-673189749


   @cadonna @guozhangwang 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#discussion_r470193578



##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##########
@@ -805,10 +805,10 @@ public void advanceWallClockTime(final Duration advance) {
 
     private Queue<ProducerRecord<byte[], byte[]>> getRecordsQueue(final String topicName) {
         final Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topicName);
-        if (outputRecords == null) {
-            if (!processorTopology.sinkTopics().contains(topicName)) {
-                throw new IllegalArgumentException("Unknown topic: " + topicName);
-            }
+        if (outputRecords == null && !processorTopology.sinkTopics().contains(topicName)) {

Review comment:
       We could detect if the processorTopology contains only `StaticTopicNameExtractors` and still throw in that case if the topic name isn't in the topology.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#issuecomment-681137931


   Cherry-picked to `2.6` branch.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#discussion_r470134136



##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##########
@@ -805,10 +805,10 @@ public void advanceWallClockTime(final Duration advance) {
 
     private Queue<ProducerRecord<byte[], byte[]>> getRecordsQueue(final String topicName) {
         final Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topicName);
-        if (outputRecords == null) {
-            if (!processorTopology.sinkTopics().contains(topicName)) {
-                throw new IllegalArgumentException("Unknown topic: " + topicName);
-            }
+        if (outputRecords == null && !processorTopology.sinkTopics().contains(topicName)) {

Review comment:
       What do you mean by "configured"? My understanding is that topics don't actually even "exist" in the TTD, we just use a MockProducer and grab output records from its history to enqueue in the `outputRecordsByTopic` map. So if an output topic hasn't been sent to yet, there's no way to say whether it does or does not exist




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#issuecomment-673610380


   Failure seem related


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#issuecomment-679373369


   @mjsax Be my guest 🙂 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org