You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2017/01/25 03:26:27 UTC

[jira] [Updated] (KAFKA-4408) KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0

     [ https://issues.apache.org/jira/browse/KAFKA-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Guozhang Wang updated KAFKA-4408:
---------------------------------
    Labels: newbie unit-test  (was: )

> KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-4408
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4408
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.0
>         Environment: Linux
>            Reporter: Byron Nikolaidis
>              Labels: newbie, unit-test
>
> In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with KTables.  The below test code worked fine under Kafka 0.10.0.1 but now produces this error:
> Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: task [0_0] Could not find partition info for topic: alertInputTopic
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
>         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
>         at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>         at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
>         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
>         at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)
>         at org.apache.kafka.test.ProcessorTopologyTestDriver.<init>(ProcessorTopologyTestDriver.java:174)
>         at mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41)
> {code}
> package mil.navy.icap.kafka.streams.processor.track;
> import java.io.IOException;
> import java.util.Properties;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.Serdes.StringSerde;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KTable;
> import org.apache.kafka.test.ProcessorTopologyTestDriver;
> public class ProcessorDriverTest2 {
>  
>  public static void main(String[] args) throws IOException, InterruptedException {
>  System.out.println("ProcessorDriverTest2");
>  
>  Properties props = new Properties();
>  props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "ProcessorDriverTest2");
>  props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>  props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
>  props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
>  props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
>  props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
>  
>  StreamsConfig streamsConfig = new StreamsConfig(props);
>  
>  // topology
>  KStreamBuilder kstreamBuilder = new KStreamBuilder();
>  StringSerde stringSerde = new StringSerde();
>  KTable<String, String> table = kstreamBuilder.table(stringSerde,
>  stringSerde, "alertInputTopic");
>  table.to(stringSerde, stringSerde, "alertOutputTopic");
>  
>  // create test driver
>  ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver(
>  streamsConfig, 
>  kstreamBuilder, 
>  "alertStore");
>  StringSerializer serializer = new StringSerializer();
>  StringDeserializer deserializer = new StringDeserializer();
>  // send data to input topic
>  testDriver.process("alertInputTopic", 
>  "the Key", "the Value", serializer, serializer);
>  
>  // read data from output topic
>  ProducerRecord<String, String> rec = testDriver.readOutput("alertOutputTopic", 
>  deserializer, deserializer);
>  
>  System.out.println("rec: " + rec);
>  }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)