You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/04/07 05:07:59 UTC

[kafka] branch 1.1 updated: KAFKA-7192: Cherry-pick 5430 to 1.1 (#6546)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 0e6e58d  KAFKA-7192: Cherry-pick 5430 to 1.1 (#6546)
0e6e58d is described below

commit 0e6e58d04cc83d6f46ee62575432ae5301a66bf4
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Sat Apr 6 22:07:40 2019 -0700

    KAFKA-7192: Cherry-pick 5430 to 1.1 (#6546)
    
    The first PR of KAFKA-7192 is cherry-picked to 1.1 but the follow-up (#5430) is not. This is causing flaky EOS system test failures.
    
    Some test results:
    
    In 2.0 branch, running 25 times (the streams_eos_test has 4 tests, so = 100 tests), no failures:
    
    http://confluent-kafka-2-0-system-test-results.s3-us-west-2.amazonaws.com/2019-04-05--001.1554466177--apache--2.0--db22e3d/report.html
    
    In 1.1 branch before this PR, running 5 times, failed 10 tests:
    
    http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2019-04-02--001.1554239700--guozhangwang--KMinor-1.1-eos-test--8395fce/report.html
    
    In this branch (after this PR), running 25 times, no failures:
    
    http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2019-04-05--001.1554465488--guozhangwang--KMinor-1.1-eos-test--897aa03/report.html
    
    Reviewers: Bill Bejeck <bb...@gmail.com>
---
 .../processor/internals/StoreChangelogReader.java  | 149 +++++++++------------
 .../streams/processor/internals/StreamTask.java    |   4 +
 .../internals/StoreChangelogReaderTest.java        |  99 +++-----------
 tests/kafkatest/services/streams.py                |   2 +-
 4 files changed, 89 insertions(+), 165 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 501ddef..8ade2de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -48,8 +48,9 @@ public class StoreChangelogReader implements ChangelogReader {
     private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
     private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
-    private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
-    private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
+    private final Set<TopicPartition> needsRestoring = new HashSet<>();
+    private final Set<TopicPartition> needsInitializing = new HashSet<>();
+    private final Set<TopicPartition> completedRestorers = new HashSet<>();
 
     public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
                                 final StateRestoreListener userStateRestoreListener,
@@ -61,14 +62,14 @@ public class StoreChangelogReader implements ChangelogReader {
 
     @Override
     public void register(final StateRestorer restorer) {
-        final StateRestorer existingRestorer = stateRestorers.get(restorer.partition());
-        if (existingRestorer == null) {
+        if (!stateRestorers.containsKey(restorer.partition())) {
             restorer.setUserRestoreListener(userStateRestoreListener);
             stateRestorers.put(restorer.partition(), restorer);
-            needsInitializing.put(restorer.partition(), restorer);
-        } else {
-            needsInitializing.put(restorer.partition(), existingRestorer);
+
+            log.trace("Added restorer for changelog {}", restorer.partition());
         }
+
+        needsInitializing.add(restorer.partition());
     }
 
     /**
@@ -84,18 +85,29 @@ public class StoreChangelogReader implements ChangelogReader {
             return completed();
         }
 
-        final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet());
         try {
-            final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10);
-            for (final TopicPartition partition : restoringPartitions) {
-                restorePartition(allRecords, partition, active.restoringTaskFor(partition));
+            final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(10);
+
+            for (final TopicPartition partition : needsRestoring) {
+                final StateRestorer restorer = stateRestorers.get(partition);
+                final long pos = processNext(records.records(partition), restorer, endOffsets.get(partition));
+                restorer.setRestoredOffset(pos);
+                if (restorer.hasCompleted(pos, endOffsets.get(partition))) {
+                    restorer.restoreDone();
+                    endOffsets.remove(partition);
+                    completedRestorers.add(partition);
+                }
             }
         } catch (final InvalidOffsetException recoverableException) {
             log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from scratch.", recoverableException);
             final Set<TopicPartition> partitions = recoverableException.partitions();
             for (final TopicPartition partition : partitions) {
                 final StreamTask task = active.restoringTaskFor(partition);
-                log.info("Reinitializing StreamTask {}", task);
+
+                log.info("Reinitializing StreamTask {} for changelog {}", task, partition);
+
+                needsInitializing.remove(partition);
+                needsRestoring.remove(partition);
 
                 final StateRestorer restorer = stateRestorers.get(partition);
                 restorer.setCheckpointOffset(StateRestorer.NO_CHECKPOINT);
@@ -104,6 +116,8 @@ public class StoreChangelogReader implements ChangelogReader {
             restoreConsumer.seekToBeginning(partitions);
         }
 
+        needsRestoring.removeAll(completedRestorers);
+
         if (needsRestoring.isEmpty()) {
             restoreConsumer.unsubscribe();
         }
@@ -120,25 +134,24 @@ public class StoreChangelogReader implements ChangelogReader {
         // the needsInitializing map is not empty, meaning we do not know the metadata for some of them yet
         refreshChangelogInfo();
 
-        final Map<TopicPartition, StateRestorer> initializable = new HashMap<>();
-        for (final Map.Entry<TopicPartition, StateRestorer> entry : needsInitializing.entrySet()) {
-            final TopicPartition topicPartition = entry.getKey();
+        final Set<TopicPartition> initializable = new HashSet<>();
+        for (final TopicPartition topicPartition : needsInitializing) {
             if (hasPartition(topicPartition)) {
-                initializable.put(entry.getKey(), entry.getValue());
+                initializable.add(topicPartition);
             }
         }
 
         // try to fetch end offsets for the initializable restorers and remove any partitions
         // where we already have all of the data
         try {
-            endOffsets.putAll(restoreConsumer.endOffsets(initializable.keySet()));
+            endOffsets.putAll(restoreConsumer.endOffsets(initializable));
         } catch (final TimeoutException e) {
             // if timeout exception gets thrown we just give up this time and retry in the next run loop
             log.debug("Could not fetch end offset for {}; will fall back to partition by partition fetching", initializable);
             return;
         }
 
-        final Iterator<TopicPartition> iter = initializable.keySet().iterator();
+        final Iterator<TopicPartition> iter = initializable.iterator();
         while (iter.hasNext()) {
             final TopicPartition topicPartition = iter.next();
             final Long endOffset = endOffsets.get(topicPartition);
@@ -146,13 +159,15 @@ public class StoreChangelogReader implements ChangelogReader {
             // offset should not be null; but since the consumer API does not guarantee it
             // we add this check just in case
             if (endOffset != null) {
-                final StateRestorer restorer = needsInitializing.get(topicPartition);
+                final StateRestorer restorer = stateRestorers.get(topicPartition);
                 if (restorer.checkpoint() >= endOffset) {
                     restorer.setRestoredOffset(restorer.checkpoint());
                     iter.remove();
+                    completedRestorers.add(topicPartition);
                 } else if (restorer.offsetLimit() == 0 || endOffset == 0) {
                     restorer.setRestoredOffset(0);
                     iter.remove();
+                    completedRestorers.add(topicPartition);
                 } else {
                     restorer.setEndingOffset(endOffset);
                 }
@@ -169,55 +184,59 @@ public class StoreChangelogReader implements ChangelogReader {
         }
     }
 
-    private void startRestoration(final Map<TopicPartition, StateRestorer> initialized,
+    private void startRestoration(final Set<TopicPartition> initialized,
                                   final RestoringTasks active) {
-        log.debug("Start restoring state stores from changelog topics {}", initialized.keySet());
+        log.debug("Start restoring state stores from changelog topics {}", initialized);
 
         final Set<TopicPartition> assignment = new HashSet<>(restoreConsumer.assignment());
-        assignment.addAll(initialized.keySet());
+        assignment.addAll(initialized);
         restoreConsumer.assign(assignment);
 
         final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
-        for (final StateRestorer restorer : initialized.values()) {
-            final TopicPartition restoringPartition = restorer.partition();
+
+        for (final TopicPartition partition : initialized) {
+            final StateRestorer restorer = stateRestorers.get(partition);
             if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
-                restoreConsumer.seek(restoringPartition, restorer.checkpoint());
-                logRestoreOffsets(restoringPartition,
-                                  restorer.checkpoint(),
-                                  endOffsets.get(restoringPartition));
-                restorer.setStartingOffset(restoreConsumer.position(restoringPartition));
+                restoreConsumer.seek(partition, restorer.checkpoint());
+                logRestoreOffsets(partition,
+                        restorer.checkpoint(),
+                        endOffsets.get(partition));
+                restorer.setStartingOffset(restoreConsumer.position(partition));
                 restorer.restoreStarted();
             } else {
-                restoreConsumer.seekToBeginning(Collections.singletonList(restoringPartition));
+                restoreConsumer.seekToBeginning(Collections.singletonList(partition));
                 needsPositionUpdate.add(restorer);
             }
         }
 
         for (final StateRestorer restorer : needsPositionUpdate) {
-            final TopicPartition restoringPartition = restorer.partition();
-            final StreamTask task = active.restoringTaskFor(restoringPartition);
+            final TopicPartition partition = restorer.partition();
+
             // If checkpoint does not exist it means the task was not shutdown gracefully before;
             // and in this case if EOS is turned on we should wipe out the state and re-initialize the task
-            if (task.eosEnabled) {
+            final StreamTask task = active.restoringTaskFor(partition);
+            if (task.isEosEnabled()) {
                 log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. " +
-                    "Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), restoringPartition);
-                // we move the partitions here, because they will be added back within
-                // `task.reinitializeStateStoresForPartitions()` that calls `register()` internally again
-                needsInitializing.remove(restoringPartition);
-                restorer.setCheckpointOffset(restoreConsumer.position(restoringPartition));
-                task.reinitializeStateStoresForPartitions(Collections.singleton(restoringPartition));
+                        "Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), partition);
+
+                needsInitializing.remove(partition);
+                initialized.remove(partition);
+                restorer.setCheckpointOffset(restoreConsumer.position(partition));
+
+                task.reinitializeStateStoresForPartitions(Collections.singleton(partition));
             } else {
-                log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), restoringPartition);
-                final long position = restoreConsumer.position(restoringPartition);
-                logRestoreOffsets(restoringPartition,
-                    position,
-                    endOffsets.get(restoringPartition));
+                log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), partition);
+
+                final long position = restoreConsumer.position(restorer.partition());
+                logRestoreOffsets(restorer.partition(),
+                        position,
+                        endOffsets.get(restorer.partition()));
                 restorer.setStartingOffset(position);
                 restorer.restoreStarted();
             }
         }
 
-        needsRestoring.putAll(initialized);
+        needsRestoring.addAll(initialized);
     }
 
     private void logRestoreOffsets(final TopicPartition partition,
@@ -230,10 +249,7 @@ public class StoreChangelogReader implements ChangelogReader {
     }
 
     private Collection<TopicPartition> completed() {
-        final Set<TopicPartition> completed = new HashSet<>(stateRestorers.keySet());
-        completed.removeAll(needsRestoring.keySet());
-        log.trace("The set of restoration completed partitions so far: {}", completed);
-        return completed;
+        return completedRestorers;
     }
 
     private void refreshChangelogInfo() {
@@ -265,41 +281,6 @@ public class StoreChangelogReader implements ChangelogReader {
         needsInitializing.clear();
     }
 
-    /**
-     * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored
-     */
-    private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords,
-                                  final TopicPartition topicPartition,
-                                  final Task task) {
-        final StateRestorer restorer = stateRestorers.get(topicPartition);
-        final Long endOffset = endOffsets.get(topicPartition);
-        final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset);
-        restorer.setRestoredOffset(pos);
-        if (restorer.hasCompleted(pos, endOffset)) {
-            if (pos > endOffset) {
-                throw new TaskMigratedException(task, topicPartition, endOffset, pos);
-            }
-
-            // need to check for changelog topic
-            if (restorer.offsetLimit() == Long.MAX_VALUE) {
-                final Long updatedEndOffset = restoreConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
-                if (!restorer.hasCompleted(pos, updatedEndOffset)) {
-                    throw new TaskMigratedException(task, topicPartition, updatedEndOffset, pos);
-                }
-            }
-
-
-            log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}",
-                      topicPartition,
-                      restorer.restoredNumRecords(),
-                      restorer.startingOffset(),
-                      restorer.restoredOffset());
-
-            restorer.restoreDone();
-            needsRestoring.remove(topicPartition);
-        }
-    }
-
     private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
                              final StateRestorer restorer,
                              final Long endOffset) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6dd4726..6e40c99 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -162,6 +162,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         }
     }
 
+    public boolean isEosEnabled() {
+        return eosEnabled;
+    }
+
     @Override
     public boolean initializeStateStores() {
         log.trace("Initializing state stores");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 5391f31..0db0216 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.test.MockRestoreCallback;
 import org.apache.kafka.test.MockStateRestoreListener;
@@ -103,6 +102,7 @@ public class StoreChangelogReaderTest {
         expect(mockRestorer.partition())
             .andReturn(new TopicPartition("sometopic", 0))
             .andReturn(new TopicPartition("sometopic", 0))
+            .andReturn(new TopicPartition("sometopic", 0))
             .andReturn(new TopicPartition("sometopic", 0));
         EasyMock.replay(mockRestorer);
         changelogReader.register(mockRestorer);
@@ -142,11 +142,15 @@ public class StoreChangelogReaderTest {
         });
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
 
-        EasyMock.expect(active.restoringTaskFor(topicPartition)).andReturn(task).andReturn(task);
-        EasyMock.replay(active);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task).anyTimes();
+        expect(task.isEosEnabled()).andReturn(false).anyTimes();
+        replay(active, task);
 
         // first restore call "fails" but we should not die with an exception
         assertEquals(0, changelogReader.restore(active).size());
+
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
+                "storeName"));
         // retry restore should succeed
         assertEquals(1, changelogReader.restore(active).size());
         assertThat(callback.restored.size(), equalTo(messages));
@@ -246,10 +250,11 @@ public class StoreChangelogReaderTest {
         changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2"));
         changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3"));
 
+        expect(task.isEosEnabled()).andReturn(false).anyTimes();
         expect(active.restoringTaskFor(one)).andReturn(task);
         expect(active.restoringTaskFor(two)).andReturn(task);
-        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
-        replay(active);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task).anyTimes();
+        replay(active, task);
         changelogReader.restore(active);
 
         assertThat(callback.restored.size(), equalTo(10));
@@ -270,9 +275,9 @@ public class StoreChangelogReaderTest {
         setupConsumer(3, two);
 
         changelogReader
-            .register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName1"));
-        changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2"));
-        changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3"));
+            .register(new StateRestorer(topicPartition, restoreListener, 0L, Long.MAX_VALUE, true, "storeName1"));
+        changelogReader.register(new StateRestorer(one, restoreListener1, 0L, Long.MAX_VALUE, true, "storeName2"));
+        changelogReader.register(new StateRestorer(two, restoreListener2, 0L, Long.MAX_VALUE, true, "storeName3"));
 
         expect(active.restoringTaskFor(one)).andReturn(task);
         expect(active.restoringTaskFor(two)).andReturn(task);
@@ -297,11 +302,10 @@ public class StoreChangelogReaderTest {
     @Test
     public void shouldOnlyReportTheLastRestoredOffset() {
         setupConsumer(10, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName1"));
-
-        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
-        replay(active);
-
+        changelogReader
+            .register(new StateRestorer(topicPartition, restoreListener, 0L, 5, true, "storeName1"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
 
         assertThat(callback.restored.size(), equalTo(5));
@@ -423,7 +427,8 @@ public class StoreChangelogReaderTest {
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         expect(active.restoringTaskFor(postInitialization)).andReturn(task);
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
-        replay(active);
+        expect(task.isEosEnabled()).andReturn(false).anyTimes();
+        replay(active, task);
 
         assertTrue(changelogReader.restore(active).isEmpty());
 
@@ -444,47 +449,6 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
-    public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopic() {
-        final int messages = 10;
-        setupConsumer(messages, topicPartition);
-        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 5L));
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
-
-        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
-        replay(active);
-
-        try {
-            changelogReader.restore(active);
-            fail("Should have thrown TaskMigratedException");
-        } catch (final TaskMigratedException expected) {
-            /* ignore */
-        }
-    }
-
-
-    @Test
-    public void shouldThrowTaskMigratedExceptionIfChangelogTopicUpdatedDuringRestoreProcessFoundInSecondCheck() {
-        final int messages = 10;
-        setupConsumer(messages, topicPartition);
-        // in this case first call to endOffsets returns correct value, but a second thread has updated the changelog topic
-        // so a subsequent call to endOffsets returns a value exceeding the expected end value
-        consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L));
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
-
-        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
-        replay(active);
-
-        try {
-            changelogReader.restore(active);
-            fail("Should have thrown TaskMigratedException");
-        } catch (final TaskMigratedException expected) {
-            // verifies second block threw exception with updated end offset
-            assertTrue(expected.getMessage().contains("end offset 15, current offset 10"));
-        }
-    }
-
-
-    @Test
     public void shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestoreProcess() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
@@ -499,31 +463,6 @@ public class StoreChangelogReaderTest {
         changelogReader.restore(active);
     }
 
-
-    @Test
-    public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled() {
-        final int totalMessages = 10;
-        assignPartition(totalMessages, topicPartition);
-        // records 0..4
-        addRecords(5, topicPartition, 0);
-        //EOS enabled commit marker at offset 5 so rest of records 6..10
-        addRecords(5, topicPartition, 6);
-        consumer.assign(Collections.<TopicPartition>emptyList());
-
-        // end offsets should start after commit marker of 5 from above
-        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 6L));
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
-
-        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
-        replay(active);
-        try {
-            changelogReader.restore(active);
-            fail("Should have thrown task migrated exception");
-        } catch (final TaskMigratedException expected) {
-            /* ignore */
-        }
-    }
-
     @Test
     public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled() {
         final int totalMessages = 10;
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 0ed3a42..e14f25e 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -276,7 +276,7 @@ class StreamsEosTestBaseService(StreamsTestBaseService):
 
     def clean_node(self, node):
         if self.clean_node_enabled:
-            super.clean_node(self, node)
+            super(StreamsEosTestBaseService, self).clean_node(node)
 
 
 class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):