You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/17 16:41:34 UTC

[kafka] branch 1.1 updated: KAFKA-6742: TopologyTestDriver error when dealing with stores from GlobalKTable

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 24355c5  KAFKA-6742: TopologyTestDriver error when dealing with stores from GlobalKTable
24355c5 is described below

commit 24355c538b1f11f882f32c4ce8873453f966f8c0
Author: Valentino Proietti <va...@kydea.com>
AuthorDate: Tue Apr 17 09:39:33 2018 -0700

    KAFKA-6742: TopologyTestDriver error when dealing with stores from GlobalKTable
    
    guozhangwang
    
    While TopologyTestDriver works well with stores created from KTable it does not with stores from GlobalKTable.
    Moreover, for my testing purposes but I think it can be useful to others, I need to get access to the MockProducer inside TopologyTestDriver.
    
    I have added 4 new tests to TopologyTestDriverTest, two for stores from KTable and two for stores from GlobalKTable.
    
    While I was changing the TopologyTestDriver I've also make it implement java.io.Closeable.
    
    Author: Valentino Proietti <va...@kydea.com>
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <wa...@gmail.com>
    
    Closes #4823 from Vale68/KAFKA-6742
    
    minor renaming
    
    (cherry picked from commit 01eddce01f94887d5f15d32d8050e8a526599fcc)
    Signed-off-by: Guozhang Wang <wa...@gmail.com>
---
 .../apache/kafka/streams/TopologyTestDriver.java   | 43 +++++++++++++++-------
 .../kafka/streams/TopologyTestDriverTest.java      | 22 +++++++++++
 2 files changed, 52 insertions(+), 13 deletions(-)

diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index abcc99d..e814fec 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -41,6 +41,7 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.GlobalStateManager;
 import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
 import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -59,6 +60,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.streams.test.OutputVerifier;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -162,18 +164,20 @@ import java.util.concurrent.atomic.AtomicLong;
  * @see OutputVerifier
  */
 @InterfaceStability.Evolving
-public class TopologyTestDriver {
+public class TopologyTestDriver implements Closeable {
 
     private final Time mockTime;
     private final InternalTopologyBuilder internalTopologyBuilder;
 
     private final static int PARTITION_ID = 0;
     private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID);
-    private StreamTask task;
-    private GlobalStateUpdateTask globalStateTask;
+    private final StreamTask task;
+    private final GlobalStateUpdateTask globalStateTask;
+    private final GlobalStateManager globalStateManager;
 
     private final StateDirectory stateDirectory;
     private final ProcessorTopology processorTopology;
+    
     private final MockProducer<byte[], byte[]> producer;
 
     private final Set<String> internalTopics = new HashSet<>();
@@ -264,7 +268,7 @@ public class TopologyTestDriver {
                 consumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
             }
 
-            final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(
+            globalStateManager = new GlobalStateManagerImpl(
                 new LogContext("mock "),
                 globalTopology,
                 consumer,
@@ -273,16 +277,19 @@ public class TopologyTestDriver {
                 streamsConfig);
 
             final GlobalProcessorContextImpl globalProcessorContext
-                = new GlobalProcessorContextImpl(streamsConfig, stateManager, streamsMetrics, cache);
-            stateManager.setGlobalProcessorContext(globalProcessorContext);
+                = new GlobalProcessorContextImpl(streamsConfig, globalStateManager, streamsMetrics, cache);
+            globalStateManager.setGlobalProcessorContext(globalProcessorContext);
 
             globalStateTask = new GlobalStateUpdateTask(
                 globalTopology,
                 globalProcessorContext,
-                stateManager,
+                globalStateManager,
                 new LogAndContinueExceptionHandler(),
                 new LogContext());
             globalStateTask.initialize();
+        } else {
+            globalStateManager = null;
+            globalStateTask = null;
         }
 
         if (!partitionsByTopic.isEmpty()) {
@@ -303,6 +310,8 @@ public class TopologyTestDriver {
                 producer);
             task.initializeStateStores();
             task.initializeTopology();
+        } else {
+            task = null;
         }
     }
 
@@ -372,7 +381,8 @@ public class TopologyTestDriver {
 
             // Forward back into the topology if the produced record is to an internal or a source topic ...
             final String outputTopicName = record.topic();
-            if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)) {
+            if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)
+                    || globalPartitionsByTopic.containsKey(outputTopicName)) {
                 final byte[] serializedKey = record.key();
                 final byte[] serializedValue = record.value();
 
@@ -410,8 +420,10 @@ public class TopologyTestDriver {
      */
     public void advanceWallClockTime(final long advanceMs) {
         mockTime.sleep(advanceMs);
-        task.maybePunctuateSystemTime();
-        task.commit();
+        if (task != null) {
+            task.maybePunctuateSystemTime();
+            task.commit();
+        }
         captureOutputRecords();
     }
 
@@ -450,7 +462,7 @@ public class TopologyTestDriver {
         final V value = valueDeserializer.deserialize(record.topic(), record.value());
         return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value);
     }
-
+    
     /**
      * Get all {@link StateStore StateStores} from the topology.
      * The stores can be a "regular" or global stores.
@@ -467,7 +479,7 @@ public class TopologyTestDriver {
     public Map<String, StateStore> getAllStateStores() {
         final Map<String, StateStore> allStores = new HashMap<>();
         for (final String storeName : internalTopologyBuilder.allStateStoreName()) {
-            allStores.put(storeName, ((ProcessorContextImpl) task.context()).getStateMgr().getStore(storeName));
+            allStores.put(storeName, getStateStore(storeName));
         }
         return allStores;
     }
@@ -487,7 +499,12 @@ public class TopologyTestDriver {
      * @see #getSessionStore(String)
      */
     public StateStore getStateStore(final String name) {
-        return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
+        StateStore stateStore = task == null ? null :
+            ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
+        if (stateStore == null && globalStateManager != null) {
+            stateStore = globalStateManager.getGlobalStore(name);
+        }
+        return stateStore;
     }
 
     /**
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index b74a754..d757f33 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -584,6 +585,10 @@ public class TopologyTestDriverTest {
     public void shouldPopulateGlobalStore() {
         testDriver = new TopologyTestDriver(setupGlobalStoreTopology(SOURCE_TOPIC_1), config);
 
+        final KeyValueStore<byte[], byte[]> globalStore = testDriver.getKeyValueStore(SOURCE_TOPIC_1 + "-globalStore");
+        Assert.assertNotNull(globalStore);
+        Assert.assertNotNull(testDriver.getAllStateStores().get(SOURCE_TOPIC_1 + "-globalStore"));
+
         testDriver.pipeInput(consumerRecord1);
 
         final List<Record> processedRecords = mockProcessors.get(0).processedRecords;
@@ -897,4 +902,21 @@ public class TopologyTestDriverTest {
             );
         }
     }
+    
+    @Test
+    public void shouldFeedStoreFromGlobalKTable() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.globalTable("topic",  
+            Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalStore"));
+        try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), config)) {
+            final KeyValueStore<String, String> globalStore = testDriver.getKeyValueStore("globalStore");
+            Assert.assertNotNull(globalStore);
+            Assert.assertNotNull(testDriver.getAllStateStores().get("globalStore"));
+            final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+            testDriver.pipeInput(recordFactory.create("topic", "k1", "value1"));
+            // we expect to have both in the global store, the one from pipeInput and the one from the producer
+            Assert.assertEquals("value1", globalStore.get("k1"));
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.