You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Valentino Proietti (JIRA)" <ji...@apache.org> on 2018/04/03 10:08:00 UTC
[jira] [Created] (KAFKA-6742) TopologyTestDriver error when dealing
with stores from GlobalKTable
Valentino Proietti created KAFKA-6742:
-----------------------------------------
Summary: TopologyTestDriver error when dealing with stores from GlobalKTable
Key: KAFKA-6742
URL: https://issues.apache.org/jira/browse/KAFKA-6742
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 1.1.0
Reporter: Valentino Proietti
{color:#FF0000}This junit test simply fails:{color}
@Test
*public* *void* globalTable() {
StreamsBuilder builder = *new* StreamsBuilder();
@SuppressWarnings("unused")
*final* KTable<String,String> localTable = builder
.table("local",
Consumed._with_(Serdes._String_(), Serdes._String_()),
Materialized._as_("localStore"))
;
@SuppressWarnings("unused")
*final* GlobalKTable<String,String> globalTable = builder
.globalTable("global",
Consumed._with_(Serdes._String_(), Serdes._String_()),
Materialized._as_("globalStore"))
;
//
Properties props = *new* Properties();
props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test");
props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost");
TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), props);
//
*final* KeyValueStore<String,String> localStore = testDriver.getKeyValueStore("localStore");
Assert._assertNotNull_(localStore);
Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore"));
//
*final* KeyValueStore<String,String> globalStore = testDriver.getKeyValueStore("globalStore");
Assert._assertNotNull_(globalStore);
Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore"));
//
*final* ConsumerRecordFactory<String,String> crf = *new* ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer());
testDriver.pipeInput(crf.create("local", "one", "TheOne"));
testDriver.pipeInput(crf.create("global", "one", "TheOne"));
//
Assert._assertEquals_("TheOne", localStore.get("one"));
Assert._assertEquals_("TheOne", globalStore.get("one"));
{color:#FF0000}to make it work I had to modify the TopologyTestDriver class as follow:{color}
...
*public* Map<String, StateStore> getAllStateStores() {
// final Map<String, StateStore> allStores = new HashMap<>();
// for (final String storeName : internalTopologyBuilder.allStateStoreName()) {
// allStores.put(storeName, ((ProcessorContextImpl) task.context()).getStateMgr().getStore(storeName));
// }
// return allStores;
{color:#FF0000}// *FIXME*{color}
*final* ProcessorStateManager psm = ((ProcessorContextImpl) task.context()).getStateMgr();
*final* Map<String, StateStore> allStores = *new* HashMap<>();
*for* (*final* String storeName : internalTopologyBuilder.allStateStoreName()) {
StateStore res = psm.getStore(storeName);
*if* (res == *null*)
res = psm.getGlobalStore(storeName);
allStores.put(storeName, res);
}
*return* allStores;
}
...
*public* StateStore getStateStore(*final* String name) {
// return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
{color:#FF0000}// *FIXME*{color}
*final* ProcessorStateManager psm = ((ProcessorContextImpl) task.context()).getStateMgr();
StateStore res = psm.getStore(name);
*if* (res == *null*)
res = psm.getGlobalStore(name);
*return* res;
}
{color:#FF0000}moreover I think it would be very useful to make the internal MockProducer public for testing cases where a producer is used along side with the "normal" stream processing by adding the method:{color}
/**
* *@return* records sent with this producer are automatically streamed to the topology.
*/
*public* *final* Producer<*byte*[], *byte*[]> getProducer() {
*return* producer;
}
{color:#FF0000}unfortunately this introduces another problem that could be verified by adding the following lines to the previous junit test:{color}
...
**
//
ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); // just to serialize keys and values
testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, cr.timestamp(), cr.key(), cr.value()));
testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, cr.timestamp(), cr.key(), cr.value()));
testDriver.advanceWallClockTime(0);
Assert._assertEquals_("TheOne", localStore.get("one"));
Assert._assertEquals_("Second", localStore.get("two"));
Assert._assertEquals_("TheOne", globalStore.get("one"));
Assert._assertEquals_("Second", globalStore.get("two"));
}
{color:#FF0000}that could be fixed with:{color}
*private* *void* captureOutputRecords() {
// Capture all the records sent to the producer ...
*final* List<ProducerRecord<*byte*[], *byte*[]>> output = producer.history();
producer.clear();
*for* (*final* ProducerRecord<*byte*[], *byte*[]> record : output) {
Queue<ProducerRecord<*byte*[], *byte*[]>> outputRecords = outputRecordsByTopic.get(record.topic());
*if* (outputRecords == *null*) {
outputRecords = *new* LinkedList<>();
outputRecordsByTopic.put(record.topic(), outputRecords);
}
outputRecords.add(record);
// Forward back into the topology if the produced record is to an internal or a source topic ...
*final* String outputTopicName = record.topic();
*if* (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)
|| globalPartitionsByTopic.containsKey(outputTopicName)) { {color:#FF0000}// *FIXME*{color}
*final* *byte*[] serializedKey = record.key();
*final* *byte*[] serializedValue = record.value();
pipeInput(*new* ConsumerRecord<>(
outputTopicName,
-1,
-1L,
record.timestamp(),
TimestampType.*_CREATE_TIME_*,
0L,
serializedKey == *null* ? 0 : serializedKey.length,
serializedValue == *null* ? 0 : serializedValue.length,
serializedKey,
serializedValue));
}
}
}
*Thank you*
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)