You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/13 00:28:35 UTC

[kafka] branch 0.11.0 updated: KAFKA-6205: initialize topology after state stores restoration completed

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
     new fac2fa0  KAFKA-6205: initialize topology after state stores restoration completed
fac2fa0 is described below

commit fac2fa0f460ceccec63c7e9123812b98036c148d
Author: Bill Bejeck <bi...@confluent.io>
AuthorDate: Fri Jan 26 09:24:40 2018 -0800

    KAFKA-6205: initialize topology after state stores restoration completed
    
    Initialize topology after state store restoration.
    Although IMHO updating some of the existing tests demonstrates the correct order of operations, I'll probably add an integration test, but I wanted to get this PR in for feedback on the approach.
    
    Author: Bill Bejeck <bi...@confluent.io>
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <mj...@apache.org>
    
    Closes #4415 from bbejeck/KAFKA-6205_restore_state_stores_before_initializing_topology
    
    minor log4j edits
---
 .../streams/processor/internals/AbstractTask.java  | 13 +++++++---
 .../streams/processor/internals/AssignedTasks.java |  3 ++-
 .../streams/processor/internals/StandbyTask.java   | 17 +++++++++++-
 .../streams/processor/internals/StreamTask.java    | 12 ++++++---
 .../processor/internals/AbstractTaskTest.java      |  9 ++++---
 .../processor/internals/AssignedTasksTest.java     | 30 ++++++++++++++++------
 .../processor/internals/StandbyTaskTest.java       |  4 +--
 .../processor/internals/StreamTaskTest.java        | 17 +++++++-----
 .../StreamThreadStateStoreProviderTest.java        |  4 +--
 .../kafka/test/ProcessorTopologyTestDriver.java    |  3 ++-
 10 files changed, 81 insertions(+), 31 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index fbb54ab..364fbe8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -194,7 +194,12 @@ public abstract class AbstractTask {
         stateMgr.flush();
     }
 
-    void initializeStateStores() {
+    /**
+     * @throws StreamsException if the store's change log does not contain the partition
+     *
+     * Package-private for testing only
+     */
+    void initStateStores() {
         if (topology.stateStores().isEmpty()) {
             return;
         }
@@ -248,10 +253,12 @@ public abstract class AbstractTask {
     }
 
     /**
-     * initialize the topology/state stores
+     * initialize the state stores
      * @return true if the topology is ready to run, i.e, all stores have been restored.
      */
-    public abstract boolean initialize();
+    public abstract boolean initializeStateStores();
+
+    public abstract void initializeTopology();
 
     abstract boolean process();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 60de4a3..fc82c6e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -86,7 +86,7 @@ class AssignedTasks<T extends AbstractTask> {
         for (final Iterator<Map.Entry<TaskId, T>> it = created.entrySet().iterator(); it.hasNext(); ) {
             final Map.Entry<TaskId, T> entry = it.next();
             try {
-                if (!entry.getValue().initialize()) {
+                if (!entry.getValue().initializeStateStores()) {
                     log.debug("{} transitioning {} {} to restoring", logPrefix, taskTypeName, entry.getKey());
                     restoring.put(entry.getKey(), entry.getValue());
                 } else {
@@ -238,6 +238,7 @@ class AssignedTasks<T extends AbstractTask> {
     private void transitionToRunning(final T task) {
         log.debug("{} transitioning {} {} to running", logPrefix, taskTypeName, task.id());
         running.put(task.id(), task);
+        task.initializeTopology();
         for (TopicPartition topicPartition : task.partitions()) {
             runningByPartition.put(topicPartition, task);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index c8bce22..af3c635 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -66,6 +66,21 @@ public class StandbyTask extends AbstractTask {
         processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);
     }
 
+    @Override
+    public boolean initializeStateStores() {
+        log.trace("Initializing state stores");
+        initStateStores();
+        checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
+        processorContext.initialized();
+        taskInitialized = true;
+        return true;
+    }
+
+    @Override
+    public void initializeTopology() {
+        //no-op
+    }
+
     /**
      * <pre>
      * - update offset limits
@@ -173,4 +188,4 @@ public class StandbyTask extends AbstractTask {
         throw new UnsupportedOperationException("commitNeeded not supported by standby task");
     }
 
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 7107fc4..b628985 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -147,13 +147,17 @@ public class StreamTask extends AbstractTask implements Punctuator {
     }
 
     @Override
-    public boolean initialize() {
-        log.trace("Initializing");
-        initializeStateStores();
+    public boolean initializeStateStores() {
+        log.trace("Initializing state stores");
+        initStateStores();
+        return changelogPartitions().isEmpty();
+    }
+
+    @Override
+    public void initializeTopology() {
         initTopology();
         processorContext.initialized();
         taskInitialized = true;
-        return changelogPartitions().isEmpty();
     }
 
     /**
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 8ba22f4..2a8cff5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -82,7 +82,7 @@ public class AbstractTaskTest {
         final AbstractTask task = createTask(consumer, Collections.singletonList(store));
 
         try {
-            task.initializeStateStores();
+            task.initStateStores();
             fail("Should have thrown LockException");
         } catch (final LockException e) {
             // ok
@@ -97,7 +97,7 @@ public class AbstractTaskTest {
 
         final AbstractTask task = createTask(consumer, Collections.<StateStore>emptyList());
 
-        task.initializeStateStores();
+        task.initStateStores();
 
         // should fail if lock is called
         EasyMock.verify(stateDirectory);
@@ -135,7 +135,7 @@ public class AbstractTaskTest {
             public void close(final boolean clean, final boolean isZombie) {}
 
             @Override
-            public boolean initialize() {
+            public boolean initializeStateStores() {
                 return false;
             }
 
@@ -153,6 +153,9 @@ public class AbstractTaskTest {
             boolean commitNeeded() {
                 return false;
             }
+
+            @Override
+            public void initializeTopology() {}
         };
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
index 4904594..2c1a27b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
@@ -96,7 +96,7 @@ public class AssignedTasksTest {
 
     @Test
     public void shouldInitializeNewTasks() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
         EasyMock.replay(t1);
 
@@ -108,8 +108,12 @@ public class AssignedTasksTest {
 
     @Test
     public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
-        EasyMock.expect(t2.initialize()).andReturn(true);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
+        EasyMock.expect(t2.initializeStateStores()).andReturn(true);
+        t2.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
         EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)).anyTimes();
         EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
@@ -128,7 +132,9 @@ public class AssignedTasksTest {
 
     @Test
     public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
-        EasyMock.expect(t2.initialize()).andReturn(true);
+        EasyMock.expect(t2.initializeStateStores()).andReturn(true);
+        t2.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)).anyTimes();
         EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
 
@@ -143,7 +149,9 @@ public class AssignedTasksTest {
     @Test
     public void shouldTransitionFullyRestoredTasksToRunning() {
         final Set<TopicPartition> task1Partitions = Utils.mkSet(tp1);
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(task1Partitions).anyTimes();
         EasyMock.expect(t1.changelogPartitions()).andReturn(Utils.mkSet(changeLog1, changeLog2)).anyTimes();
         EasyMock.replay(t1);
@@ -221,7 +229,9 @@ public class AssignedTasksTest {
     }
 
     private void mockInitializedTask() {
-        EasyMock.expect(t1.initialize()).andReturn(true);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(true);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
     }
@@ -231,6 +241,8 @@ public class AssignedTasksTest {
         mockRunningTaskSuspension();
         t1.resume();
         EasyMock.expectLastCall();
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.replay(t1);
 
         suspendTask();
@@ -422,11 +434,13 @@ public class AssignedTasksTest {
     }
 
     private void mockRunningTaskSuspension() {
-        EasyMock.expect(t1.initialize()).andReturn(true);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(true);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes();
         t1.suspend();
         EasyMock.expectLastCall();
     }
 
-}
\ No newline at end of file
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 329aab2..d86840c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -239,7 +239,7 @@ public class StandbyTaskTest {
 
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, applicationId, ktablePartitions, ktableTopology, consumer, changelogReader, config, null, stateDirectory);
-        task.initialize();
+        task.initializeStateStores();
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
 
         for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
@@ -359,7 +359,7 @@ public class StandbyTaskTest {
                                                  null,
                                                  stateDirectory
         );
-        task.initialize();
+        task.initializeStateStores();
 
 
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 4bfa0a2..25de93c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -146,7 +146,8 @@ public class StreamTaskTest {
         stateDirectory = new StateDirectory("applicationId", baseDir.getPath(), new MockTime());
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
                               changelogReader, config, streamsMetrics, stateDirectory, null, time, producer);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
     }
 
     @After
@@ -377,7 +378,9 @@ public class StreamTaskTest {
 
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config,
             streamsMetrics, stateDirectory, null, time, producer);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
+
         final int offset = 20;
         task.addRecords(partition1, Collections.singletonList(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -495,7 +498,8 @@ public class StreamTaskTest {
                 };
             }
         };
-        streamTask.initialize();
+        streamTask.initializeStateStores();
+        streamTask.initializeTopology();
 
         time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
 
@@ -613,7 +617,8 @@ public class StreamTaskTest {
     public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() throws Exception {
         task.close(true, false);
         task = createTaskThatThrowsExceptionOnClose();
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         try {
             task.close(true, false);
             fail("should have thrown runtime exception");
@@ -835,7 +840,7 @@ public class StreamTaskTest {
                                                time,
                                                producer);
 
-        assertTrue(task.initialize());
+        assertTrue(task.initializeStateStores());
     }
 
     @Test
@@ -863,7 +868,7 @@ public class StreamTaskTest {
                                                time,
                                                producer);
 
-        assertFalse(task.initialize());
+        assertFalse(task.initializeStateStores());
     }
 
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index e24f535..07d67bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -103,12 +103,12 @@ public class StreamThreadStateStoreProviderTest {
         stateDirectory = new StateDirectory(applicationId, stateConfigDir, new MockTime());
         taskOne = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
                                     new TaskId(0, 0));
-        taskOne.initialize();
+        taskOne.initializeStateStores();
         tasks.put(new TaskId(0, 0),
                   taskOne);
         taskTwo = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
                                     new TaskId(0, 1));
-        taskTwo.initialize();
+        taskTwo.initializeStateStores();
         tasks.put(new TaskId(0, 1),
                   taskTwo);
 
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 228171a..87dedfe 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -222,7 +222,8 @@ public class ProcessorTopologyTestDriver {
                                   cache,
                                   new MockTime(),
                                   producer);
-            task.initialize();
+            task.initializeStateStores();
+            task.initializeTopology();
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.