You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/08/07 15:14:09 UTC
[kafka] branch 1.1 updated: KAFKA-8602: Backport bugfix for standby
task creation (#7147)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new f6316c3 KAFKA-8602: Backport bugfix for standby task creation (#7147)
f6316c3 is described below
commit f6316c39b9d2d9630066416d15eca7ad5cec99fb
Author: cadonna <br...@confluent.io>
AuthorDate: Wed Aug 7 17:13:40 2019 +0200
KAFKA-8602: Backport bugfix for standby task creation (#7147)
Backports bugfix in standby task creation from PR #7008.
A separate PR is needed because some tests in the original PR
use topology optimizations and mocks that were introduced afterwards.
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../streams/processor/internals/StreamThread.java | 20 ++-
.../StandbyTaskCreationIntegrationTest.java | 189 +++++++++++++++++++++
.../processor/internals/StreamThreadTest.java | 133 +++++++++++++--
3 files changed, 318 insertions(+), 24 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a5199da..77771cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -472,15 +472,17 @@ public class StreamThread extends Thread {
final ProcessorTopology topology = builder.build(taskId.topicGroupId);
- if (!topology.stateStores().isEmpty()) {
- return new StandbyTask(taskId,
- partitions,
- topology,
- consumer,
- storeChangelogReader,
- config,
- streamsMetrics,
- stateDirectory);
+ if (!topology.stateStores().isEmpty() && !topology.storeToChangelogTopic().isEmpty()) {
+ return new StandbyTask(
+ taskId,
+ partitions,
+ topology,
+ consumer,
+ storeChangelogReader,
+ config,
+ streamsMetrics,
+ stateDirectory
+ );
} else {
log.trace("Skipped standby task {} with assigned partitions {} " +
"since it does not have any state stores to materialize", taskId, partitions);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
new file mode 100644
index 0000000..ed2781f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KafkaStreams.StateListener;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Properties;
+
+@Category({IntegrationTest.class})
+public class StandbyTaskCreationIntegrationTest {
+
+ private static final int NUM_BROKERS = 1;
+
+ @ClassRule
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+ private static final String INPUT_TOPIC = "input-topic";
+
+ private KafkaStreams client1;
+ private KafkaStreams client2;
+ private volatile boolean client1IsOk = false;
+ private volatile boolean client2IsOk = false;
+
+ @BeforeClass
+ public static void createTopics() throws InterruptedException {
+ CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+ }
+
+ @After
+ public void after() {
+ client1.close();
+ client2.close();
+ }
+
+ private Properties streamsConfiguration() {
+ final String applicationId = "testApp";
+ final Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+ streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+ return streamsConfiguration;
+ }
+
+ @Test
+ public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled() throws Exception {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String stateStoreName = "myTransformState";
+ final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
+ Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
+ Serdes.Integer(),
+ Serdes.Integer()).withLoggingDisabled();
+ builder.addStateStore(keyValueStoreBuilder);
+ builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+ .transform(new TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>>() {
+ @Override
+ public Transformer<Integer, Integer, KeyValue<Integer, Integer>> get() {
+ return new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(final ProcessorContext context) {}
+
+
+ @Override
+ public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
+ return null;
+ }
+
+ @Override
+ public KeyValue<Integer, Integer> punctuate(final long timestamp) {
+ return null;
+ }
+
+ @Override
+ public void close() {};
+ };
+ }
+ }, stateStoreName);
+
+ final Topology topology = builder.build();
+ createClients(topology, streamsConfiguration(), topology, streamsConfiguration());
+
+ setStateListenersForVerification();
+
+ startClients();
+
+ waitUntilBothClientAreOK(
+ "At least one client did not reach state RUNNING with active tasks but no stand-by tasks"
+ );
+ }
+
+ private void createClients(final Topology topology1,
+ final Properties streamsConfiguration1,
+ final Topology topology2,
+ final Properties streamsConfiguration2) {
+
+ client1 = new KafkaStreams(topology1, streamsConfiguration1);
+ client2 = new KafkaStreams(topology2, streamsConfiguration2);
+ }
+
+ private void setStateListenersForVerification() {
+ client1.setStateListener(new StateListener() {
+ @Override
+ public void onChange(final State newState, final State oldState) {
+ if (newState == State.RUNNING) {
+ client1IsOk = true;
+ for (final ThreadMetadata metadata : client1.localThreadsMetadata()) {
+ if (!(metadata.standbyTasks().isEmpty() && !metadata.activeTasks().isEmpty())) {
+ client1IsOk = false;
+ }
+ }
+ }
+ }
+ });
+ client2.setStateListener(new StateListener() {
+ @Override
+ public void onChange(final State newState, final State oldState) {
+ if (newState == State.RUNNING) {
+ client2IsOk = true;
+ for (final ThreadMetadata metadata : client2.localThreadsMetadata()) {
+ if (!(metadata.standbyTasks().isEmpty() && !metadata.activeTasks().isEmpty())) {
+ client2IsOk = false;
+ }
+ }
+ }
+ }
+ });
+ }
+
+ private void startClients() {
+ client1.start();
+ client2.start();
+ }
+
+ private void waitUntilBothClientAreOK(final String message) throws Exception {
+ TestUtils.waitForCondition(
+ new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return client1IsOk && client2IsOk;
+ }
+ },
+ 30 * 1000,
+ message + ": "
+ + "Client 1 is " + (!client1IsOk ? "NOT " : "") + "OK, "
+ + "client 2 is " + (!client2IsOk ? "NOT " : "") + "OK."
+ );
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 860bcbe..f3fea48 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
@@ -42,11 +43,17 @@ import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.streams.processor.internals.StreamThread.StreamsMetricsThreadImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockTimestampExtractor;
@@ -56,6 +63,7 @@ import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
@@ -72,11 +80,13 @@ import java.util.UUID;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.processor.internals.AbstractStateManager.CHECKPOINT_FILE_NAME;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -824,23 +834,25 @@ public class StreamThreadTest {
final TopicPartition partition1 = new TopicPartition(changelogName1, 1);
final TopicPartition partition2 = new TopicPartition(changelogName2, 1);
internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
- .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as(storeName1));
- final MaterializedInternal materialized = new MaterializedInternal(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as(storeName2),
- internalStreamsBuilder, "");
+ .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as(storeName1));
+ final MaterializedInternal materialized = new MaterializedInternal(
+ Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as(storeName2),
+ internalStreamsBuilder, ""
+ );
internalStreamsBuilder.table(topic2, new ConsumedInternal(), materialized);
final StreamThread thread = createStreamThread(clientId, config, false);
final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
restoreConsumer.updatePartitions(changelogName1,
- singletonList(
- new PartitionInfo(
- changelogName1,
- 1,
- null,
- new Node[0],
- new Node[0]
- )
+ singletonList(
+ new PartitionInfo(
+ changelogName1,
+ 1,
+ null,
+ new Node[0],
+ new Node[0]
)
+ )
);
restoreConsumer.assign(Utils.mkSet(partition1, partition2));
@@ -849,12 +861,15 @@ public class StreamThreadTest {
restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 10L));
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 0L));
// let the store1 be restored from 0 to 10; store2 be restored from 5 (checkpointed) to 10
- OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(task3), CHECKPOINT_FILE_NAME));
+ OffsetCheckpoint checkpoint = new OffsetCheckpoint(
+ new File(stateDirectory.directoryForTask(task3), CHECKPOINT_FILE_NAME));
checkpoint.write(Collections.singletonMap(partition2, 5L));
for (long i = 0L; i < 10L; i++) {
- restoreConsumer.addRecord(new ConsumerRecord<>(changelogName1, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes()));
- restoreConsumer.addRecord(new ConsumerRecord<>(changelogName2, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes()));
+ restoreConsumer
+ .addRecord(new ConsumerRecord<>(changelogName1, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes()));
+ restoreConsumer
+ .addRecord(new ConsumerRecord<>(changelogName2, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes()));
}
thread.setState(StreamThread.State.RUNNING);
@@ -883,6 +898,94 @@ public class StreamThreadTest {
assertEquals(0, thread.standbyRecords().size());
}
+ private static class MockProcessor implements Processor {
+ @Override
+ public void init(final ProcessorContext context) {
+
+ }
+
+ @Override
+ public void process(final Object key, final Object value) {
+
+ }
+
+ @Override
+ public void punctuate(final long timestamp) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+
+ @Test
+ public void shouldCreateStandbyTask() {
+ setupInternalTopologyWithoutState();
+ internalTopologyBuilder.addStateStore(
+ Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myStore"), null, null),
+ "processor1"
+ );
+
+ final StandbyTask standbyTask = createStandbyTask();
+
+ assertThat(standbyTask, not(nullValue()));
+ }
+
+ @Test
+ public void shouldNotCreateStandbyTaskWithoutStateStores() {
+ setupInternalTopologyWithoutState();
+
+ final StandbyTask standbyTask = createStandbyTask();
+
+ assertThat(standbyTask, nullValue());
+ }
+
+
+ @Test
+ public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled() {
+ setupInternalTopologyWithoutState();
+ final StoreBuilder storeBuilder =
+ Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myStore"), null, null);
+ storeBuilder.withLoggingDisabled();
+ internalTopologyBuilder.addStateStore(storeBuilder, "processor1");
+
+ final StandbyTask standbyTask = createStandbyTask();
+
+ assertThat(standbyTask, nullValue());
+ }
+
+ private void setupInternalTopologyWithoutState() {
+ internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
+ internalTopologyBuilder.addProcessor("processor1", new ProcessorSupplier() {
+ @Override
+ public Processor get() {
+ return new MockProcessor();
+ }
+ }, "source1");
+ }
+
+ private StandbyTask createStandbyTask() {
+ final LogContext logContext = new LogContext("test");
+ final Logger log = logContext.logger(StreamThreadTest.class);
+ final StreamsMetricsThreadImpl streamsMetrics =
+ new StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, String>emptyMap());
+ final StreamThread.StandbyTaskCreator standbyTaskCreator = new StreamThread.StandbyTaskCreator(
+ internalTopologyBuilder,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ streamsMetrics.taskCreatedSensor,
+ new MockChangelogReader(),
+ mockTime,
+ log);
+ return standbyTaskCreator.createTask(
+ new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST),
+ new TaskId(1, 2),
+ Collections.<TopicPartition>emptySet());
+ }
+
@Test
public void shouldAlwaysUpdateTasksMetadataAfterChangingState() {
final StreamThread thread = createStreamThread(clientId, config, false);