You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/26 17:35:14 UTC

[kafka] branch trunk updated: KAFKA-6205: initialize topology after state stores restoration completed

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c615c59  KAFKA-6205: initialize topology after state stores restoration completed
c615c59 is described below

commit c615c597aa24cc5a4552102f1b6d27fcf4e24d12
Author: Bill Bejeck <bi...@confluent.io>
AuthorDate: Fri Jan 26 09:24:40 2018 -0800

    KAFKA-6205: initialize topology after state stores restoration completed
    
    Initialize topology after state store restoration.
    Although IMHO updating some of the existing tests demonstrates the correct order of operations, I'll probably add an integration test, but I wanted to get this PR in for feedback on the approach.
    
    Author: Bill Bejeck <bi...@confluent.io>
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <mj...@apache.org>
    
    Closes #4415 from bbejeck/KAFKA-6205_restore_state_stores_before_initializing_topology
    
    minor log4j edits
---
 .../streams/processor/internals/AbstractTask.java  |  5 +-
 .../streams/processor/internals/AssignedTasks.java |  3 +-
 .../streams/processor/internals/StandbyTask.java   | 10 +++-
 .../streams/processor/internals/StreamTask.java    | 13 +++--
 .../kafka/streams/processor/internals/Task.java    |  4 +-
 .../streams/processor/internals/TaskManager.java   |  1 +
 .../processor/internals/AbstractTaskTest.java      |  9 ++--
 .../internals/AssignedStreamsTasksTest.java        | 30 ++++++++---
 .../processor/internals/StandbyTaskTest.java       | 10 ++--
 .../processor/internals/StreamTaskTest.java        | 58 ++++++++++++++--------
 .../StreamThreadStateStoreProviderTest.java        |  4 +-
 .../kafka/test/ProcessorTopologyTestDriver.java    |  3 +-
 12 files changed, 100 insertions(+), 50 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 2b8af6d..d9c827f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -196,10 +196,11 @@ public abstract class AbstractTask implements Task {
     }
 
     /**
-     * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
+     *
+     * Package-private for testing only
      */
-    void initializeStateStores() {
+    void registerStateStores() {
         if (topology.stateStores().isEmpty()) {
             return;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index b90ec10..2cd82f4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -99,7 +99,7 @@ abstract class AssignedTasks<T extends Task> {
         for (final Iterator<Map.Entry<TaskId, T>> it = created.entrySet().iterator(); it.hasNext(); ) {
             final Map.Entry<TaskId, T> entry = it.next();
             try {
-                if (!entry.getValue().initialize()) {
+                if (!entry.getValue().initializeStateStores()) {
                     log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
                     addToRestoring(entry.getValue());
                 } else {
@@ -272,6 +272,7 @@ abstract class AssignedTasks<T extends Task> {
     private void transitionToRunning(final T task, final Set<TopicPartition> readyPartitions) {
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
+        task.initializeTopology();
         for (TopicPartition topicPartition : task.partitions()) {
             runningByPartition.put(topicPartition, task);
             if (task.hasStateStores()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 837f607..39d34d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -62,14 +62,20 @@ public class StandbyTask extends AbstractTask {
     }
 
     @Override
-    public boolean initialize() {
-        initializeStateStores();
+    public boolean initializeStateStores() {
+        log.trace("Initializing state stores");
+        registerStateStores();
         checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
         processorContext.initialized();
         taskInitialized = true;
         return true;
     }
 
+    @Override
+    public void initializeTopology() {
+        //no-op
+    }
+
     /**
      * <pre>
      * - update offset limits
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 7063e74..e293f25 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -161,16 +161,19 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     }
 
     @Override
-    public boolean initialize() {
-        log.trace("Initializing");
-        initializeStateStores();
+    public boolean initializeStateStores() {
+        log.trace("Initializing state stores");
+        registerStateStores();
+        return changelogPartitions().isEmpty();
+    }
+
+    @Override
+    public void initializeTopology() {
         initTopology();
         processorContext.initialized();
         taskInitialized = true;
-        return changelogPartitions().isEmpty();
     }
 
-
     /**
      * <pre>
      * - re-initialize the task
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index f066bff..5f221e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -32,7 +32,9 @@ public interface Task {
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
      */
-    boolean initialize();
+    boolean initializeStateStores();
+
+    void initializeTopology();
 
     void commit();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index bdc1c00..62ddacf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -327,6 +327,7 @@ class TaskManager {
         standby.initializeNewTasks();
 
         final Collection<TopicPartition> restored = changelogReader.restore(active);
+
         resumed.addAll(active.updateRestored(restored));
 
         if (!resumed.isEmpty()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 776110c..4569858 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -101,7 +101,7 @@ public class AbstractTaskTest {
         final AbstractTask task = createTask(consumer, Collections.singletonMap(store, "dummy"));
 
         try {
-            task.initializeStateStores();
+            task.registerStateStores();
             fail("Should have thrown LockException");
         } catch (final LockException e) {
             // ok
@@ -116,7 +116,7 @@ public class AbstractTaskTest {
 
         final AbstractTask task = createTask(consumer, Collections.<StateStore, String>emptyMap());
 
-        task.initializeStateStores();
+        task.registerStateStores();
 
         // should fail if lock is called
         EasyMock.verify(stateDirectory);
@@ -254,9 +254,12 @@ public class AbstractTaskTest {
             public void closeSuspended(final boolean clean, final boolean isZombie, final RuntimeException e) {}
 
             @Override
-            public boolean initialize() {
+            public boolean initializeStateStores() {
                 return false;
             }
+
+            @Override
+            public void initializeTopology() {}
         };
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 3d33b0b..4bb7828 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -89,7 +89,7 @@ public class AssignedStreamsTasksTest {
 
     @Test
     public void shouldInitializeNewTasks() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
         EasyMock.replay(t1);
@@ -101,10 +101,14 @@ public class AssignedStreamsTasksTest {
 
     @Test
     public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
-        EasyMock.expect(t2.initialize()).andReturn(true);
+        EasyMock.expect(t2.initializeStateStores()).andReturn(true);
+        t2.initializeTopology();
+        EasyMock.expectLastCall().once();
         final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
         EasyMock.expect(t2.partitions()).andReturn(t2partitions);
         EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
@@ -125,7 +129,9 @@ public class AssignedStreamsTasksTest {
 
     @Test
     public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
-        EasyMock.expect(t2.initialize()).andReturn(true);
+        EasyMock.expect(t2.initializeStateStores()).andReturn(true);
+        t2.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
         EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
         EasyMock.expect(t2.hasStateStores()).andReturn(false);
@@ -142,10 +148,12 @@ public class AssignedStreamsTasksTest {
     @Test
     public void shouldTransitionFullyRestoredTasksToRunning() {
         final Set<TopicPartition> task1Partitions = Utils.mkSet(tp1);
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(task1Partitions).anyTimes();
         EasyMock.expect(t1.changelogPartitions()).andReturn(Utils.mkSet(changeLog1, changeLog2)).anyTimes();
         EasyMock.expect(t1.hasStateStores()).andReturn(true).anyTimes();
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.replay(t1);
 
         addAndInitTask();
@@ -169,7 +177,7 @@ public class AssignedStreamsTasksTest {
 
     @Test
     public void shouldCloseRestoringTasks() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
         t1.close(false, false);
@@ -236,6 +244,8 @@ public class AssignedStreamsTasksTest {
         mockRunningTaskSuspension();
         t1.resume();
         EasyMock.expectLastCall();
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.replay(t1);
 
         assertThat(suspendTask(), nullValue());
@@ -266,7 +276,9 @@ public class AssignedStreamsTasksTest {
     }
 
     private void mockTaskInitialization() {
-        EasyMock.expect(t1.initialize()).andReturn(true);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(true);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
         EasyMock.expect(t1.hasStateStores()).andReturn(false);
@@ -449,7 +461,9 @@ public class AssignedStreamsTasksTest {
     }
 
     private void mockRunningTaskSuspension() {
-        EasyMock.expect(t1.initialize()).andReturn(true);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(true);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.hasStateStores()).andReturn(false).anyTimes();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index ce655bc..605ab33 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -155,7 +155,7 @@ public class StandbyTaskTest {
     public void testStorePartitions() throws IOException {
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
-        task.initialize();
+        task.initializeStateStores();
         assertEquals(Utils.mkSet(partition2, partition1), new HashSet<>(task.checkpointedOffsets().keySet()));
     }
 
@@ -177,7 +177,7 @@ public class StandbyTaskTest {
     public void testUpdate() throws IOException {
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
-        task.initialize();
+        task.initializeStateStores();
         final Set<TopicPartition> partition = Collections.singleton(partition2);
         restoreStateConsumer.assign(partition);
 
@@ -223,7 +223,7 @@ public class StandbyTaskTest {
 
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, ktablePartitions, ktableTopology, consumer, changelogReader, config, null, stateDirectory);
-        task.initialize();
+        task.initializeStateStores();
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
 
         for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
@@ -344,7 +344,7 @@ public class StandbyTaskTest {
                                                  null,
                                                  stateDirectory
         );
-        task.initialize();
+        task.initializeStateStores();
 
 
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
@@ -396,7 +396,7 @@ public class StandbyTaskTest {
                 closedStateManager.set(true);
             }
         };
-        task.initialize();
+        task.initializeStateStores();
         try {
             task.close(true, false);
             fail("should have thrown exception");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a3ff328..92cfe66 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -318,7 +318,8 @@ public class StreamTaskTest {
     @Test
     public void testMaybePunctuateStreamTime() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -384,7 +385,8 @@ public class StreamTaskTest {
     @Test
     public void testCancelPunctuateStreamTime() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -416,7 +418,8 @@ public class StreamTaskTest {
     @Test
     public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         long now = time.milliseconds();
         time.sleep(10);
         assertTrue(task.maybePunctuateSystemTime());
@@ -430,7 +433,8 @@ public class StreamTaskTest {
     @Test
     public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         long now = time.milliseconds();
         assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate
         time.sleep(9);
@@ -441,7 +445,8 @@ public class StreamTaskTest {
     @Test
     public void testCancelPunctuateSystemTime() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         long now = time.milliseconds();
         time.sleep(10);
         assertTrue(task.maybePunctuateSystemTime());
@@ -454,7 +459,8 @@ public class StreamTaskTest {
     @Test
     public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() {
         task = createTaskThatThrowsException();
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.addRecords(partition2, Collections.singletonList(
                 new ConsumerRecord<>(partition2.topic(), partition2.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
 
@@ -469,7 +475,8 @@ public class StreamTaskTest {
     @Test
     public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         try {
             task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, new Punctuator() {
@@ -489,7 +496,8 @@ public class StreamTaskTest {
     @Test
     public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         try {
             task.punctuate(processorSystemTime, 1, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
@@ -531,7 +539,8 @@ public class StreamTaskTest {
     @Test
     public void shouldCheckpointOffsetsOnCommit() throws IOException {
         task = createStatefulTask(false, true);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.commit();
         final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId00),
                                                                           ProcessorStateManager.CHECKPOINT_FILE_NAME));
@@ -542,7 +551,8 @@ public class StreamTaskTest {
     @Test
     public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
         task = createStatefulTask(true, true);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.commit();
         final File checkpointFile = new File(stateDirectory.directoryForTask(taskId00),
                                              ProcessorStateManager.CHECKPOINT_FILE_NAME);
@@ -553,7 +563,8 @@ public class StreamTaskTest {
     @Test
     public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.processorContext.setCurrentNode(processorStreamTime);
         try {
             task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator);
@@ -566,7 +577,8 @@ public class StreamTaskTest {
     @Test
     public void shouldCallPunctuateOnPassedInProcessorNode() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator);
         assertThat(punctuatedAt, equalTo(5L));
         task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator);
@@ -576,7 +588,8 @@ public class StreamTaskTest {
     @Test
     public void shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator);
         assertThat(((ProcessorContextImpl) task.context()).currentNode(), nullValue());
     }
@@ -607,7 +620,8 @@ public class StreamTaskTest {
     @Test
     public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() {
         task = createTaskThatThrowsException();
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         try {
             task.close(true, false);
             fail("should have thrown runtime exception");
@@ -760,7 +774,8 @@ public class StreamTaskTest {
     @Test
     public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushing() {
         task = createTaskThatThrowsException();
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         try {
             task.commit();
@@ -774,7 +789,8 @@ public class StreamTaskTest {
     public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() {
         final StreamTask task = createTaskThatThrowsException();
 
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         try {
             task.suspend();
             fail("should have thrown an exception");
@@ -786,7 +802,8 @@ public class StreamTaskTest {
     @Test
     public void shouldCloseStateManagerIfFailureOnTaskClose() {
         task = createStatefulTaskThatThrowsExceptionOnClose(true, false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         try {
             task.close(true, false);
@@ -813,14 +830,14 @@ public class StreamTaskTest {
     public void shouldBeInitializedIfChangelogPartitionsIsEmpty() {
         final StreamTask task = createStatefulTask(false, false);
 
-        assertTrue(task.initialize());
+        assertTrue(task.initializeStateStores());
     }
 
     @Test
     public void shouldNotBeInitializedIfChangelogPartitionsIsNonEmpty() {
         final StreamTask task = createStatefulTask(false, true);
 
-        assertFalse(task.initialize());
+        assertFalse(task.initializeStateStores());
     }
 
     @Test
@@ -840,7 +857,8 @@ public class StreamTaskTest {
 
         task = new StreamTask(taskId00, Utils.mkSet(partition1, repartition), topology, consumer, changelogReader, config,
                 streamsMetrics, stateDirectory, null, time, producer);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 5L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 9821e4c..85c282c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -103,12 +103,12 @@ public class StreamThreadStateStoreProviderTest {
         stateDirectory = new StateDirectory(streamsConfig, new MockTime());
         taskOne = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
                                     new TaskId(0, 0));
-        taskOne.initialize();
+        taskOne.initializeStateStores();
         tasks.put(new TaskId(0, 0),
                   taskOne);
         taskTwo = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
                                     new TaskId(0, 1));
-        taskTwo.initialize();
+        taskTwo.initializeStateStores();
         tasks.put(new TaskId(0, 1),
                   taskTwo);
 
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index ae0cc9c..e5f15bc 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -248,7 +248,8 @@ public class ProcessorTopologyTestDriver {
                                   cache,
                                   new MockTime(),
                                   producer);
-            task.initialize();
+            task.initializeStateStores();
+            task.initializeTopology();
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.