You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (Jira)" <ji...@apache.org> on 2021/03/24 17:43:00 UTC

[jira] [Commented] (KAFKA-12542) Unknown Output topic when Filter Processor is returning false

    [ https://issues.apache.org/jira/browse/KAFKA-12542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308053#comment-17308053 ] 

Guozhang Wang commented on KAFKA-12542:
---------------------------------------

Hi [~sinhash]

I cannot tell if `dlqTopic:outputTopic` is indeed named as `OUTPUT_TOPIC`, I suspect there may be some mis-naming?

BTW I'd suggest you to send such questions to the mailing list, where you can get more visibility and helpers; the JIRA system is usually for reporting confirmed bugs.

> Unknown Output topic when Filter Processor is returning false
> -------------------------------------------------------------
>
>                 Key: KAFKA-12542
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12542
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams-test-utils
>    Affects Versions: 2.6.0
>            Reporter: SHWETA SINHA
>            Priority: Major
>
> I am using Kafka Streams DSL to create topology.
> StreamsBuilder streamsBuilder=new StreamsBuilder();
> valueSerde.configure(getConfigForSpecificAvro(appId),false);
> KStream<String, AvroDTO> stream = streamsBuilder.stream(inputTopic, Consumed.with(new Serdes.StringSerde(), valueSerde));
> KStream<String, AvroDTO> filtered = stream .filter((key, value) -> ServiceConsumer.filter(key,value));
> filtered .map((KeyValueMapper<String, AvroDTO, KeyValue<String, SpecificRecordBase>>) (key, value) -> ServiceConsumer.process(key,value))
>  .to((k,v,recordContext) -> v instanceof AvroDTO? dlqTopic:outputTopic,Produced.with(new Serdes.StringSerde(), valueSerde));
> Topology topology=streamsBuilder.build();
> KafkaStreams kafkaStreams=new KafkaStreams(topology,getKafkaStreamsConfig(appId));
>  
> To Test the Topology, I am using TopologyTestDriver.  
> when(getKafkaStreamsConfig(any())).thenReturn(kafkaConfig);
>  when(ServiceConsumer.filter(any(),any())).thenReturn(false);
> // when(ServiceConsumer.process(any(),any())).thenReturn(new KeyValue<>(statusDto.getTaskId().toString(),statusDto));
>  topologyTestDriver=new TopologyTestDriver(getTopologyAndStartKafkaStreams(),kafkaConfig);
> StreamInput = topologyTestDriver.createInputTopic("INPUT_TOPIC", new StringSerializer(), new KafkaAvroSerializer(schemaRegistryClient));
> UpdateOutput =topologyTestDriver.createOutputTopic("OUTPUT_TOPIC",new StringDeserializer(),new KafkaAvroDeserializer(schemaRegistryClient,config));
> StreamInput.pipeInput("Hi);
> assertThat(UpdateOutput .isEmpty()).isTrue();
>  
> I am checking if there are no filtered messages then my output topic is empty.
> Getting Error while Unit Testing
> java.lang.IllegalArgumentException: Unknown topic: OUTPUT_TOPIC
>  
> Changing when(ServiceConsumer.filter(any(),any())).thenReturn(false); to true doesnt gives any error.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)