You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Valentino Proietti (JIRA)" <ji...@apache.org> on 2018/04/03 10:08:00 UTC

[jira] [Created] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable

Valentino Proietti created KAFKA-6742:
-----------------------------------------

             Summary: TopologyTestDriver error when dealing with stores from GlobalKTable
                 Key: KAFKA-6742
                 URL: https://issues.apache.org/jira/browse/KAFKA-6742
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 1.1.0
            Reporter: Valentino Proietti


{color:#FF0000}This junit test simply fails:{color}

@Test

*public* *void* globalTable() {

StreamsBuilder builder = *new* StreamsBuilder();

@SuppressWarnings("unused")

*final* KTable<String,String> localTable = builder

.table("local", 

Consumed._with_(Serdes._String_(), Serdes._String_()),

Materialized._as_("localStore"))

;

@SuppressWarnings("unused")

*final* GlobalKTable<String,String> globalTable = builder

.globalTable("global", 

Consumed._with_(Serdes._String_(), Serdes._String_()),

        Materialized._as_("globalStore"))

;

//

Properties props = *new* Properties();

props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test");

props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost");

TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), props);

//

*final* KeyValueStore<String,String> localStore = testDriver.getKeyValueStore("localStore");

Assert._assertNotNull_(localStore);

Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore"));

//

*final* KeyValueStore<String,String> globalStore = testDriver.getKeyValueStore("globalStore");

Assert._assertNotNull_(globalStore);

Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore"));

//

    *final* ConsumerRecordFactory<String,String> crf = *new* ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer());

testDriver.pipeInput(crf.create("local", "one", "TheOne"));

testDriver.pipeInput(crf.create("global", "one", "TheOne"));

//

Assert._assertEquals_("TheOne", localStore.get("one"));

Assert._assertEquals_("TheOne", globalStore.get("one"));

 

 

{color:#FF0000}to make it work I had to modify the TopologyTestDriver class as follow:{color}

...

    *public* Map<String, StateStore> getAllStateStores() {

//        final Map<String, StateStore> allStores = new HashMap<>();

//        for (final String storeName : internalTopologyBuilder.allStateStoreName()) {

//            allStores.put(storeName, ((ProcessorContextImpl) task.context()).getStateMgr().getStore(storeName));

//        }

//        return allStores;

    {color:#FF0000}// *FIXME*{color}

    *final* ProcessorStateManager psm = ((ProcessorContextImpl) task.context()).getStateMgr();

        *final* Map<String, StateStore> allStores = *new* HashMap<>();

        *for* (*final* String storeName : internalTopologyBuilder.allStateStoreName()) {

            StateStore res = psm.getStore(storeName);

            *if* (res == *null*)

            res = psm.getGlobalStore(storeName);

            allStores.put(storeName, res);

        }

        *return* allStores;

    }

...

    *public* StateStore getStateStore(*final* String name) {

//        return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);

        {color:#FF0000}// *FIXME*{color}

    *final* ProcessorStateManager psm = ((ProcessorContextImpl) task.context()).getStateMgr();

        StateStore res = psm.getStore(name);

        *if* (res == *null*)

        res = psm.getGlobalStore(name);

        *return* res;

    }

 

{color:#FF0000}moreover I think it would be very useful to make the internal MockProducer public for testing cases where a producer is used along side with the "normal" stream processing by adding the method:{color}

    /**

     * *@return* records sent with this producer are automatically streamed to the topology.

     */

    *public* *final* Producer<*byte*[], *byte*[]> getProducer() {

    *return* producer;

    }

 

{color:#FF0000}unfortunately this introduces another problem that could be verified by adding the following lines to the previous junit test:{color}

...

**

//

ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); // just to serialize keys and values

testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, cr.timestamp(), cr.key(), cr.value()));

testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, cr.timestamp(), cr.key(), cr.value()));

testDriver.advanceWallClockTime(0);

Assert._assertEquals_("TheOne", localStore.get("one"));

Assert._assertEquals_("Second", localStore.get("two"));

Assert._assertEquals_("TheOne", globalStore.get("one"));

Assert._assertEquals_("Second", globalStore.get("two"));

}

 

{color:#FF0000}that could be fixed with:{color}

 

    *private* *void* captureOutputRecords() {

        // Capture all the records sent to the producer ...

        *final* List<ProducerRecord<*byte*[], *byte*[]>> output = producer.history();

        producer.clear();

        *for* (*final* ProducerRecord<*byte*[], *byte*[]> record : output) {

            Queue<ProducerRecord<*byte*[], *byte*[]>> outputRecords = outputRecordsByTopic.get(record.topic());

            *if* (outputRecords == *null*) {

                outputRecords = *new* LinkedList<>();

                outputRecordsByTopic.put(record.topic(), outputRecords);

            }

            outputRecords.add(record);

 

            // Forward back into the topology if the produced record is to an internal or a source topic ...

            *final* String outputTopicName = record.topic();

            *if* (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)

            || globalPartitionsByTopic.containsKey(outputTopicName)) {  {color:#FF0000}// *FIXME*{color}

                *final* *byte*[] serializedKey = record.key();

                *final* *byte*[] serializedValue = record.value();

 

                pipeInput(*new* ConsumerRecord<>(

                    outputTopicName,

                    -1,

                    -1L,

                    record.timestamp(),

                    TimestampType.*_CREATE_TIME_*,

                    0L,

                    serializedKey == *null* ? 0 : serializedKey.length,

                    serializedValue == *null* ? 0 : serializedValue.length,

                    serializedKey,

                    serializedValue));

            }

        }

    }

 

 

 

*Thank you*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)