You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/02/24 20:12:33 UTC
[kafka] branch trunk updated: KAFKA-12738: track processing errors and implement constant-time task backoff (#11787)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cd4a1cb KAFKA-12738: track processing errors and implement constant-time task backoff (#11787)
cd4a1cb is described below
commit cd4a1cb4101abcc7cdd1d2d0d73662114108f3e5
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Thu Feb 24 12:10:31 2022 -0800
KAFKA-12738: track processing errors and implement constant-time task backoff (#11787)
Part 1 in the initial series of error handling for named topologies.
*Part 1: Track tasks with errors within a named topology & implement constant-time based task backoff
Part 2: Implement exponential task backoff to account for recurring errors
Part 3: Pause/backoff all tasks within a named topology in case of a long backoff/frequent errors for any individual task
Reviewers: Guozhang Wang <gu...@confluent.io>, Walker Carlson <wc...@confluent.io>
---
.../streams/errors/UnknownTopologyException.java | 4 +-
.../streams/processor/internals/StreamThread.java | 3 +-
.../processor/internals/TaskExecutionMetadata.java | 98 +++++++++++++
.../streams/processor/internals/TaskExecutor.java | 42 ++++--
.../streams/processor/internals/TaskManager.java | 8 +-
.../processor/internals/TopologyMetadata.java | 12 +-
.../integration/EmitOnChangeIntegrationTest.java | 78 ----------
...Test.java => ErrorHandlingIntegrationTest.java} | 162 +++++----------------
8 files changed, 180 insertions(+), 227 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java b/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java
index 2124405..d764484 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java
@@ -24,11 +24,11 @@ public class UnknownTopologyException extends StreamsException {
private static final long serialVersionUID = 1L;
public UnknownTopologyException(final String message, final String namedTopology) {
- super(message + "due to being unable to locate a Topology named " + namedTopology);
+ super(message + " due to being unable to locate a Topology named " + namedTopology);
}
public UnknownTopologyException(final String message, final Throwable throwable, final String namedTopology) {
- super(message + "due to being unable to locate a Topology named " + namedTopology, throwable);
+ super(message + " due to being unable to locate a Topology named " + namedTopology, throwable);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 2a53e32..3af07bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -932,7 +932,8 @@ public class StreamThread extends Thread {
.ifPresent(t -> taskManager.updateTaskEndMetadata(topicPartition, t.offset()));
}
- log.debug("Main Consumer poll completed in {} ms and fetched {} records", pollLatency, numRecords);
+ log.debug("Main Consumer poll completed in {} ms and fetched {} records from partitions {}",
+ pollLatency, numRecords, records.partitions());
pollSensor.record(pollLatency, now);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
new file mode 100644
index 0000000..63b53ae
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+
+/**
+ * Multi-threaded class that tracks the status of active tasks being processed. A single instance of this class is
+ * shared between all StreamThreads.
+ */
+public class TaskExecutionMetadata {
+ private final boolean hasNamedTopologies;
+ // map of topologies experiencing errors/currently under backoff
+ private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>();
+
+ public TaskExecutionMetadata(final Set<String> allTopologyNames) {
+ this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
+ }
+
+ public boolean canProcessTask(final Task task, final long now) {
+ final String topologyName = task.id().topologyName();
+ if (!hasNamedTopologies) {
+ // TODO implement error handling/backoff for non-named topologies (needs KIP)
+ return true;
+ } else {
+ final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
+ return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
+ }
+ }
+
+ public void registerTaskError(final Task task, final Throwable t, final long now) {
+ if (hasNamedTopologies) {
+ final String topologyName = task.id().topologyName();
+ topologyNameToErrorMetadata.computeIfAbsent(topologyName, n -> new NamedTopologyMetadata(topologyName))
+ .registerTaskError(task, t, now);
+ }
+ }
+
+ class NamedTopologyMetadata {
+ private final Logger log;
+ private final Map<TaskId, Long> tasksToErrorTime = new ConcurrentHashMap<>();
+
+ public NamedTopologyMetadata(final String topologyName) {
+ final LogContext logContext = new LogContext(String.format("topology-name [%s] ", topologyName));
+ this.log = logContext.logger(NamedTopologyMetadata.class);
+ }
+
+ public boolean canProcess() {
+ // TODO: during long task backoffs, pause the full topology to avoid it getting out of sync
+ return true;
+ }
+
+ public boolean canProcessTask(final Task task, final long now) {
+ // TODO: implement exponential backoff, for now we just wait 15s
+ final Long errorTime = tasksToErrorTime.get(task.id());
+ if (errorTime == null) {
+ return true;
+ } else if (now - errorTime > 15000L) {
+ log.info("End backoff for task {} at t={}", task.id(), now);
+ tasksToErrorTime.remove(task.id());
+ if (tasksToErrorTime.isEmpty()) {
+ topologyNameToErrorMetadata.remove(task.id().topologyName());
+ }
+ return true;
+ } else {
+ log.debug("Skipping processing for unhealthy task {} at t={}", task.id(), now);
+ return false;
+ }
+ }
+
+ public synchronized void registerTaskError(final Task task, final Throwable t, final long now) {
+ log.info("Begin backoff for unhealthy task {} at t={}", task.id(), now);
+ tasksToErrorTime.put(task.id(), now);
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index 4edc35b..cad03fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -51,12 +51,15 @@ public class TaskExecutor {
private final boolean hasNamedTopologies;
private final ProcessingMode processingMode;
private final Tasks tasks;
+ private final TaskExecutionMetadata taskExecutionMetadata;
public TaskExecutor(final Tasks tasks,
+ final TaskExecutionMetadata taskExecutionMetadata,
final ProcessingMode processingMode,
final boolean hasNamedTopologies,
final LogContext logContext) {
this.tasks = tasks;
+ this.taskExecutionMetadata = taskExecutionMetadata;
this.processingMode = processingMode;
this.hasNamedTopologies = hasNamedTopologies;
this.log = logContext.logger(getClass());
@@ -69,23 +72,28 @@ public class TaskExecutor {
int process(final int maxNumRecords, final Time time) {
int totalProcessed = 0;
Task lastProcessed = null;
- try {
- for (final Task task : tasks.activeTasks()) {
- lastProcessed = task;
- totalProcessed += processTask(task, maxNumRecords, time);
+
+ for (final Task task : tasks.activeTasks()) {
+ final long now = time.milliseconds();
+ try {
+ if (taskExecutionMetadata.canProcessTask(task, now)) {
+ lastProcessed = task;
+ totalProcessed += processTask(task, maxNumRecords, now, time);
+ }
+ } catch (final Throwable t) {
+ taskExecutionMetadata.registerTaskError(task, t, now);
+ tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed);
+ commitSuccessfullyProcessedTasks();
+ throw t;
}
- } catch (final Exception e) {
- tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed);
- commitSuccessfullyProcessedTasks();
- throw e;
}
return totalProcessed;
}
- private long processTask(final Task task, final int maxNumRecords, final Time time) {
+ private long processTask(final Task task, final int maxNumRecords, final long begin, final Time time) {
int processed = 0;
- long now = time.milliseconds();
+ long now = begin;
final long then = now;
try {
@@ -94,12 +102,14 @@ public class TaskExecutor {
processed++;
}
// TODO: enable regardless of whether using named topologies
- if (hasNamedTopologies && processingMode != EXACTLY_ONCE_V2) {
+ if (processed > 0 && hasNamedTopologies && processingMode != EXACTLY_ONCE_V2) {
+ log.trace("Successfully processed task {}", task.id());
tasks.addToSuccessfullyProcessed(task);
}
} catch (final TimeoutException timeoutException) {
+ // TODO consolidate TimeoutException retries with general error handling
task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
- log.debug(
+ log.error(
String.format(
"Could not complete processing records for %s due to the following exception; will move to next task and retry later",
task.id()),
@@ -110,11 +120,11 @@ public class TaskExecutor {
"Will trigger a new rebalance and close all tasks as zombies together.", task.id());
throw e;
} catch (final StreamsException e) {
- log.error("Failed to process stream task {} due to the following error:", task.id(), e);
+ log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), e);
e.setTaskId(task.id());
throw e;
} catch (final RuntimeException e) {
- log.error("Failed to process stream task {} due to the following error:", task.id(), e);
+ log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), e);
throw new StreamsException(e, task.id());
} finally {
now = time.milliseconds();
@@ -132,7 +142,7 @@ public class TaskExecutor {
* @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit
*/
int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCommit,
- final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) {
+ final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) {
int committed = 0;
for (final Task task : tasksToCommit) {
// we need to call commitNeeded first since we need to update committable offsets
@@ -253,7 +263,7 @@ public class TaskExecutor {
if (!tasks.successfullyProcessed().isEmpty()) {
log.info("Streams encountered an error when processing tasks." +
" Will commit all previously successfully processed tasks {}",
- tasks.successfullyProcessed().toString());
+ tasks.successfullyProcessed().stream().map(Task::id));
commitTasksAndMaybeUpdateCommittableOffsets(tasks.successfullyProcessed(), new HashMap<>());
}
tasks.clearSuccessfullyProcessed();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 0ff6dc2..9cb2b9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -116,7 +116,13 @@ public class TaskManager {
this.log = logContext.logger(getClass());
this.tasks = new Tasks(logContext, topologyMetadata, streamsMetrics, activeTaskCreator, standbyTaskCreator);
- this.taskExecutor = new TaskExecutor(tasks, processingMode, topologyMetadata.hasNamedTopologies(), logContext);
+ this.taskExecutor = new TaskExecutor(
+ tasks,
+ topologyMetadata.taskExecutionMetadata(),
+ processingMode,
+ topologyMetadata.hasNamedTopologies(),
+ logContext
+ );
}
void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index 602abd3..dbcacf7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -69,6 +69,7 @@ public class TopologyMetadata {
private final StreamsConfig config;
private final ProcessingMode processingMode;
private final TopologyVersion version;
+ private final TaskExecutionMetadata taskExecutionMetadata;
private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability
@@ -96,8 +97,8 @@ public class TopologyMetadata {
public TopologyMetadata(final InternalTopologyBuilder builder,
final StreamsConfig config) {
- version = new TopologyVersion();
- processingMode = StreamsConfigUtils.processingMode(config);
+ this.version = new TopologyVersion();
+ this.processingMode = StreamsConfigUtils.processingMode(config);
this.config = config;
builders = new ConcurrentSkipListMap<>();
@@ -106,6 +107,7 @@ public class TopologyMetadata {
} else {
builders.put(UNNAMED_TOPOLOGY, builder);
}
+ this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet());
}
public TopologyMetadata(final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders,
@@ -115,11 +117,11 @@ public class TopologyMetadata {
this.config = config;
this.log = LoggerFactory.getLogger(getClass());
-
this.builders = builders;
if (builders.isEmpty()) {
log.info("Created an empty KafkaStreams app with no topology");
}
+ this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet());
}
// Need to (re)set the log here to pick up the `processId` part of the clientId in the prefix
@@ -160,6 +162,10 @@ public class TopologyMetadata {
maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(threadName);
}
+ public TaskExecutionMetadata taskExecutionMetadata() {
+ return taskExecutionMetadata;
+ }
+
public void maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(final String threadName) {
try {
lock();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
index 2d04070..25f0f30 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
@@ -30,8 +30,6 @@ import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThr
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
-import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
@@ -47,14 +45,11 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
@Category(IntegrationTest.class)
public class EmitOnChangeIntegrationTest {
@@ -177,77 +172,4 @@ public class EmitOnChangeIntegrationTest {
);
}
}
-
- @Test
- public void shouldEmitRecordsAfterFailures() throws Exception {
- final Properties properties = mkObjectProperties(
- mkMap(
- mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
- mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
- mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
- mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
- mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
- mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
- mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
- mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)
- )
- );
-
- try (final KafkaStreamsNamedTopologyWrapper kafkaStreams = new KafkaStreamsNamedTopologyWrapper(properties)) {
- kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
-
- final NamedTopologyBuilder builder = kafkaStreams.newNamedTopologyBuilder("topology_A");
- final AtomicInteger noOutputExpected = new AtomicInteger(0);
- final AtomicInteger twoOutputExpected = new AtomicInteger(0);
- builder.stream(inputTopic2).peek((k, v) -> twoOutputExpected.incrementAndGet()).to(outputTopic2);
- builder.stream(inputTopic)
- .peek((k, v) -> {
- throw new RuntimeException("Kaboom");
- })
- .peek((k, v) -> noOutputExpected.incrementAndGet())
- .to(outputTopic);
-
- kafkaStreams.addNamedTopology(builder.build());
-
- StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- inputTopic,
- Arrays.asList(
- new KeyValue<>(1, "A")
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- StringSerializer.class,
- new Properties()),
- 0L);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- inputTopic2,
- Arrays.asList(
- new KeyValue<>(1, "A"),
- new KeyValue<>(1, "B")
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- StringSerializer.class,
- new Properties()),
- 0L);
- IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
- TestUtils.consumerConfig(
- CLUSTER.bootstrapServers(),
- IntegerDeserializer.class,
- StringDeserializer.class
- ),
- outputTopic2,
- Arrays.asList(
- new KeyValue<>(1, "A"),
- new KeyValue<>(1, "B")
- )
- );
- assertThat(noOutputExpected.get(), equalTo(0));
- assertThat(twoOutputExpected.get(), equalTo(2));
- }
- }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java
similarity index 52%
copy from streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
copy to streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java
index 2d04070..40d2cf9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java
@@ -22,19 +22,21 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -43,21 +45,16 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@Category(IntegrationTest.class)
-public class EmitOnChangeIntegrationTest {
+public class ErrorHandlingIntegrationTest {
private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@@ -74,145 +71,58 @@ public class EmitOnChangeIntegrationTest {
@Rule
public TestName testName = new TestName();
- private static String inputTopic;
- private static String inputTopic2;
- private static String outputTopic;
- private static String outputTopic2;
- private static String appId = "";
+ private final String testId = safeUniqueTestName(getClass(), testName);
+ private final String appId = "appId_" + testId;
+ private final Properties properties = props();
+
+ // Task 0
+ private final String inputTopic = "input" + testId;
+ private final String outputTopic = "output" + testId;
+ // Task 1
+ private final String errorInputTopic = "error-input" + testId;
+ private final String errorOutputTopic = "error-output" + testId;
@Before
public void setup() {
- final String testId = safeUniqueTestName(getClass(), testName);
- appId = "appId_" + testId;
- inputTopic = "input" + testId;
- inputTopic2 = "input2" + testId;
- outputTopic = "output" + testId;
- outputTopic2 = "output2" + testId;
- IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic, inputTopic2, outputTopic2);
+ IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, errorInputTopic, errorOutputTopic, inputTopic, outputTopic);
}
- @Test
- public void shouldEmitSameRecordAfterFailover() throws Exception {
- final Properties properties = mkObjectProperties(
+ private Properties props() {
+ return mkObjectProperties(
mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
- mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
- mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()),
mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
- mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
+ mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L),
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
- mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)
- )
+ mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000))
);
-
- final AtomicBoolean shouldThrow = new AtomicBoolean(true);
- final StreamsBuilder builder = new StreamsBuilder();
- builder.table(inputTopic, Materialized.as("test-store"))
- .toStream()
- .map((key, value) -> {
- if (shouldThrow.compareAndSet(true, false)) {
- throw new RuntimeException("Kaboom");
- } else {
- return new KeyValue<>(key, value);
- }
- })
- .to(outputTopic);
- builder.stream(inputTopic2).to(outputTopic2);
-
- try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
- kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
- StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
-
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- inputTopic,
- Arrays.asList(
- new KeyValue<>(1, "A"),
- new KeyValue<>(1, "B")
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- StringSerializer.class,
- new Properties()),
- 0L);
-
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- inputTopic2,
- Arrays.asList(
- new KeyValue<>(1, "A"),
- new KeyValue<>(1, "B")
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- StringSerializer.class,
- new Properties()),
- 0L);
-
- IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
- TestUtils.consumerConfig(
- CLUSTER.bootstrapServers(),
- IntegerDeserializer.class,
- StringDeserializer.class
- ),
- outputTopic,
- Arrays.asList(
- new KeyValue<>(1, "A"),
- new KeyValue<>(1, "B")
- )
- );
- IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
- TestUtils.consumerConfig(
- CLUSTER.bootstrapServers(),
- IntegerDeserializer.class,
- StringDeserializer.class
- ),
- outputTopic2,
- Arrays.asList(
- new KeyValue<>(1, "A"),
- new KeyValue<>(1, "B")
- )
- );
- }
}
@Test
- public void shouldEmitRecordsAfterFailures() throws Exception {
- final Properties properties = mkObjectProperties(
- mkMap(
- mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
- mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
- mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
- mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
- mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
- mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
- mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
- mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)
- )
- );
+ public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception {
+ final AtomicInteger noOutputExpected = new AtomicInteger(0);
+ final AtomicInteger outputExpected = new AtomicInteger(0);
try (final KafkaStreamsNamedTopologyWrapper kafkaStreams = new KafkaStreamsNamedTopologyWrapper(properties)) {
kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
-
+
final NamedTopologyBuilder builder = kafkaStreams.newNamedTopologyBuilder("topology_A");
- final AtomicInteger noOutputExpected = new AtomicInteger(0);
- final AtomicInteger twoOutputExpected = new AtomicInteger(0);
- builder.stream(inputTopic2).peek((k, v) -> twoOutputExpected.incrementAndGet()).to(outputTopic2);
- builder.stream(inputTopic)
+ builder.stream(inputTopic).peek((k, v) -> outputExpected.incrementAndGet()).to(outputTopic);
+ builder.stream(errorInputTopic)
.peek((k, v) -> {
throw new RuntimeException("Kaboom");
})
.peek((k, v) -> noOutputExpected.incrementAndGet())
- .to(outputTopic);
+ .to(errorOutputTopic);
+
+ kafkaStreams.addNamedTopology(builder.build());
- kafkaStreams.addNamedTopology(builder.build());
-
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- inputTopic,
+ errorInputTopic,
Arrays.asList(
new KeyValue<>(1, "A")
),
@@ -223,7 +133,7 @@ public class EmitOnChangeIntegrationTest {
new Properties()),
0L);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- inputTopic2,
+ inputTopic,
Arrays.asList(
new KeyValue<>(1, "A"),
new KeyValue<>(1, "B")
@@ -240,14 +150,14 @@ public class EmitOnChangeIntegrationTest {
IntegerDeserializer.class,
StringDeserializer.class
),
- outputTopic2,
+ outputTopic,
Arrays.asList(
new KeyValue<>(1, "A"),
new KeyValue<>(1, "B")
)
);
assertThat(noOutputExpected.get(), equalTo(0));
- assertThat(twoOutputExpected.get(), equalTo(2));
+ assertThat(outputExpected.get(), equalTo(2));
}
}
}