You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (Jira)" <ji...@apache.org> on 2021/03/24 17:43:00 UTC
[jira] [Commented] (KAFKA-12542) Unknown Output topic when Filter
Processor is returning false
[ https://issues.apache.org/jira/browse/KAFKA-12542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308053#comment-17308053 ]
Guozhang Wang commented on KAFKA-12542:
---------------------------------------
Hi [~sinhash]
I cannot tell if `dlqTopic:outputTopic` is indeed named as `OUTPUT_TOPIC`, I suspect there may be some mis-naming?
BTW I'd suggest you to send such questions to the mailing list, where you can get more visibility and helpers; the JIRA system is usually for reporting confirmed bugs.
> Unknown Output topic when Filter Processor is returning false
> -------------------------------------------------------------
>
> Key: KAFKA-12542
> URL: https://issues.apache.org/jira/browse/KAFKA-12542
> Project: Kafka
> Issue Type: Bug
> Components: streams-test-utils
> Affects Versions: 2.6.0
> Reporter: SHWETA SINHA
> Priority: Major
>
> I am using Kafka Streams DSL to create topology.
> StreamsBuilder streamsBuilder=new StreamsBuilder();
> valueSerde.configure(getConfigForSpecificAvro(appId),false);
> KStream<String, AvroDTO> stream = streamsBuilder.stream(inputTopic, Consumed.with(new Serdes.StringSerde(), valueSerde));
> KStream<String, AvroDTO> filtered = stream .filter((key, value) -> ServiceConsumer.filter(key,value));
> filtered .map((KeyValueMapper<String, AvroDTO, KeyValue<String, SpecificRecordBase>>) (key, value) -> ServiceConsumer.process(key,value))
> .to((k,v,recordContext) -> v instanceof AvroDTO? dlqTopic:outputTopic,Produced.with(new Serdes.StringSerde(), valueSerde));
> Topology topology=streamsBuilder.build();
> KafkaStreams kafkaStreams=new KafkaStreams(topology,getKafkaStreamsConfig(appId));
>
> To Test the Topology, I am using TopologyTestDriver.
> when(getKafkaStreamsConfig(any())).thenReturn(kafkaConfig);
> when(ServiceConsumer.filter(any(),any())).thenReturn(false);
> // when(ServiceConsumer.process(any(),any())).thenReturn(new KeyValue<>(statusDto.getTaskId().toString(),statusDto));
> topologyTestDriver=new TopologyTestDriver(getTopologyAndStartKafkaStreams(),kafkaConfig);
> StreamInput = topologyTestDriver.createInputTopic("INPUT_TOPIC", new StringSerializer(), new KafkaAvroSerializer(schemaRegistryClient));
> UpdateOutput =topologyTestDriver.createOutputTopic("OUTPUT_TOPIC",new StringDeserializer(),new KafkaAvroDeserializer(schemaRegistryClient,config));
> StreamInput.pipeInput("Hi);
> assertThat(UpdateOutput .isEmpty()).isTrue();
>
> I am checking if there are no filtered messages then my output topic is empty.
> Getting Error while Unit Testing
> java.lang.IllegalArgumentException: Unknown topic: OUTPUT_TOPIC
>
> Changing when(ServiceConsumer.filter(any(),any())).thenReturn(false); to true doesnt gives any error.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)