You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/05 23:48:50 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #8818: KAFKA-10086: Integration test for ensuring warmups are effective

ableegoldman commented on a change in pull request #8818:
URL: https://github.com/apache/kafka/pull/8818#discussion_r436196593



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -514,17 +515,24 @@ void handleLostAll() {
 
     /**
      * Compute the offset total summed across all stores in a task. Includes offset sum for any tasks we own the
-     * lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllNonEmptyTaskDirectories()}
-     *
-     * @return Map from task id to its total offset summed across all state stores
+     * lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllNonEmptyTaskDirectories()}.
+     * Does not include stateless or non-logged tasks.
      */
     public Map<TaskId, Long> getTaskOffsetSums() {
         final Map<TaskId, Long> taskOffsetSums = new HashMap<>();
 
-        for (final TaskId id : lockedTaskDirectories) {
+        // Not all tasks will create directories, and there may be directories for tasks we don't currently own,
+        // so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should
+        // just have an empty changelogOffsets map.
+        for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.keySet())) {
             final Task task = tasks.get(id);
             if (task != null) {
-                taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets()));
+                final Map<TopicPartition, Long> changelogOffsets = task.changelogOffsets();
+                if (changelogOffsets.isEmpty()) {
+                    log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", id);

Review comment:
       "apparently" ? 🤔 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1084,12 +1088,15 @@ private boolean populateActiveTaskAndPartitionsLists(final List<TopicPartition>
                 // If the partition is new to this consumer but is still owned by another, remove from the assignment
                 // until it has been revoked and can safely be reassigned according to the COOPERATIVE protocol
                 if (newPartitionForConsumer && allOwnedPartitions.contains(partition)) {
-                    log.info("Removing task {} from assignment until it is safely revoked in followup rebalance", taskId);
-                    clientState.unassignActive(taskId);

Review comment:
       I guess it was nice to have as a sanity check that the task really was assigned to this client, and that we haven't already tried to remove it or something. But maybe we can take that for granted...? (I only mention it because it helped me find a bug in the thread-level stickiness PR)

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##########
@@ -142,4 +169,184 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il
             assertThat(taskAssignor, instanceOf(MyTaskAssignor.class));
         }
     }
+
+    @Test
+    public void shouldScaleOutWithWarmupTasksAndInMemoryStores() throws InterruptedException {
+        // NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
+        // value is one minute
+        shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)));
+    }
+
+    @Test
+    public void shouldScaleOutWithWarmupTasksAndPersistentStores() throws InterruptedException {
+        // NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
+        // value is one minute
+        shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)));
+    }
+
+    private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<Object, Object, KeyValueStore<Bytes, byte[]>>> materializedFunction) throws InterruptedException {
+        final String testId = safeUniqueTestName(getClass(), testName);
+        final String appId = "appId_" + System.currentTimeMillis() + "_" + testId;
+        final String inputTopic = "input" + testId;
+        final String storeName = "store" + testId;
+        final String storeChangelog = appId + "-store" + testId + "-changelog";
+        final Set<TopicPartition> changelogTopicPartitions = mkSet(
+            new TopicPartition(storeChangelog, 0),
+            new TopicPartition(storeChangelog, 1)
+        );
+
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, inputTopic, storeChangelog);
+
+        final ReentrantLock assignmentLock = new ReentrantLock();
+        final AtomicInteger assignmentsCompleted = new AtomicInteger(0);
+        final AtomicBoolean assignmentStable = new AtomicBoolean(false);
+        final AssignmentListener assignmentListener =
+            stable -> {
+                assignmentLock.lock();
+                try {
+                    assignmentsCompleted.incrementAndGet();
+                    assignmentStable.set(stable);
+                } finally {
+                    assignmentLock.unlock();
+                }
+            };
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table(inputTopic, materializedFunction.apply(storeName));
+        final Topology topology = builder.build();
+
+        final Properties producerProperties = mkProperties(
+            mkMap(
+                mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(ProducerConfig.ACKS_CONFIG, "all"),
+                mkEntry(ProducerConfig.RETRIES_CONFIG, "0"),
+                mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()),
+                mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
+            )
+        );
+
+        final StringBuilder kiloBuilder = new StringBuilder(1000);
+        for (int i = 0; i < 1000; i++) {
+            kiloBuilder.append('0');
+        }
+        final String kilo = kiloBuilder.toString();
+
+        try (final Producer<String, String> producer = new KafkaProducer<>(producerProperties)) {
+            for (int i = 0; i < 1000; i++) {
+                producer.send(new ProducerRecord<>(inputTopic, String.valueOf(i), kilo));
+            }
+        }
+
+        final Properties consumerProperties = mkProperties(
+            mkMap(
+                mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()),
+                mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
+            )
+        );
+
+
+        try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener));
+             final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener));
+             final Consumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
+            kafkaStreams0.start();
+
+            // wait until all the input records are in the changelog
+            TestUtils.waitForCondition(
+                () -> getChangelogOffsetSum(changelogTopicPartitions, consumer) == 1000,
+                120_000L,
+                () -> "Input records haven't all been written to the changelog: " + getChangelogOffsetSum(changelogTopicPartitions, consumer)
+            );
+
+            final AtomicLong instance1TotalRestored = new AtomicLong(-1);
+            final CountDownLatch restoreCompleteLatch = new CountDownLatch(1);
+            kafkaStreams1.setGlobalStateRestoreListener(new StateRestoreListener() {
+                @Override
+                public void onRestoreStart(final TopicPartition topicPartition,
+                                           final String storeName,
+                                           final long startingOffset,
+                                           final long endingOffset) {
+                }
+
+                @Override
+                public void onBatchRestored(final TopicPartition topicPartition,
+                                            final String storeName,
+                                            final long batchEndOffset,
+                                            final long numRestored) {
+                }

Review comment:
       Should we also add up the records restored here, since `onRestoreEnd` is not invoked if the task is closed during restoration?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##########
@@ -142,4 +169,184 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il
             assertThat(taskAssignor, instanceOf(MyTaskAssignor.class));
         }
     }
+
+    @Test
+    public void shouldScaleOutWithWarmupTasksAndInMemoryStores() throws InterruptedException {
+        // NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
+        // value is one minute
+        shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)));
+    }
+
+    @Test
+    public void shouldScaleOutWithWarmupTasksAndPersistentStores() throws InterruptedException {
+        // NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum

Review comment:
       What is `NB`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org