You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/11 04:04:15 UTC
[kafka] 01/01: MINOR: code cleanup for Kafka Streams task classes
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch kafka-9441-refactor-tm
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 1966d4363e91f5045b014ce375ce0a58ee2a5a79
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Wed Jun 10 20:05:25 2020 -0700
MINOR: code cleanup for Kafka Streams task classes
---
.../streams/processor/internals/AbstractTask.java | 63 +-
.../processor/internals/ActiveTaskCreator.java | 14 +-
.../streams/processor/internals/StandbyTask.java | 146 ++--
.../processor/internals/StandbyTaskCreator.java | 16 +-
.../streams/processor/internals/StreamTask.java | 968 +++++++++++----------
.../kafka/streams/processor/internals/Task.java | 111 +--
.../streams/processor/internals/TaskManager.java | 2 +-
.../processor/internals/StandbyTaskTest.java | 11 +-
.../processor/internals/StreamTaskTest.java | 94 +-
.../processor/internals/TaskManagerTest.java | 3 +-
.../StreamThreadStateStoreProviderTest.java | 15 +-
.../apache/kafka/streams/TopologyTestDriver.java | 14 +-
12 files changed, 743 insertions(+), 714 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index f59571b..8c342a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -16,82 +16,85 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.util.List;
-import java.util.Map;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
import java.util.Collection;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
-import org.slf4j.Logger;
import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
public abstract class AbstractTask implements Task {
private Task.State state = CREATED;
- protected Set<TopicPartition> inputPartitions;
- protected ProcessorTopology topology;
protected final TaskId id;
+ protected final ProcessorTopology topology;
protected final StateDirectory stateDirectory;
protected final ProcessorStateManager stateMgr;
+ protected Set<TopicPartition> inputPartitions;
+
AbstractTask(final TaskId id,
final ProcessorTopology topology,
final StateDirectory stateDirectory,
final ProcessorStateManager stateMgr,
final Set<TopicPartition> inputPartitions) {
this.id = id;
- this.stateMgr = stateMgr;
this.topology = topology;
- this.inputPartitions = inputPartitions;
+ this.stateMgr = stateMgr;
this.stateDirectory = stateDirectory;
+ this.inputPartitions = inputPartitions;
}
@Override
- public TaskId id() {
- return id;
+ public void revive() {
+ if (state == CLOSED) {
+ transitionTo(CREATED);
+ } else {
+ throw new IllegalStateException("Illegal state " + state() + " while reviving task " + id);
+ }
}
@Override
- public Set<TopicPartition> inputPartitions() {
- return inputPartitions;
+ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions) {
+ stateMgr.markChangelogAsCorrupted(partitions);
}
@Override
- public Collection<TopicPartition> changelogPartitions() {
- return stateMgr.changelogPartitions();
+ public final TaskId id() {
+ return id;
}
@Override
- public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions) {
- stateMgr.markChangelogAsCorrupted(partitions);
+ public final Task.State state() {
+ return state;
}
@Override
- public StateStore getStore(final String name) {
- return stateMgr.getStore(name);
+ public void updateInputPartitions(final Set<TopicPartition> topicPartitions,
+ final Map<String, List<String>> nodeToSourceTopics) {
+ this.inputPartitions = topicPartitions;
+ topology.updateSourceTopics(nodeToSourceTopics);
}
@Override
- public boolean isClosed() {
- return state() == State.CLOSED;
+ public Set<TopicPartition> inputPartitions() {
+ return inputPartitions;
}
@Override
- public final Task.State state() {
- return state;
+ public StateStore getStore(final String name) {
+ return stateMgr.getStore(name);
}
@Override
- public void revive() {
- if (state == CLOSED) {
- transitionTo(CREATED);
- } else {
- throw new IllegalStateException("Illegal state " + state() + " while reviving task " + id);
- }
+ public Collection<TopicPartition> changelogPartitions() {
+ return stateMgr.changelogPartitions();
}
final void transitionTo(final Task.State newState) {
@@ -118,10 +121,4 @@ public abstract class AbstractTask implements Task {
}
}
}
-
- @Override
- public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
- this.inputPartitions = topicPartitions;
- topology.updateSourceTopics(nodeToSourceTopics);
- }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 0a1f47e..4d2dc5f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -230,17 +230,17 @@ class ActiveTaskCreator {
final StreamTask task = new StreamTask(
taskId,
- partitions,
topology,
- consumer,
- config,
- streamsMetrics,
stateDirectory,
+ stateManager,
+ partitions,
+ config,
+ context,
cache,
+ streamsMetrics,
time,
- stateManager,
- recordCollector,
- context
+ consumer,
+ recordCollector
);
log.trace("Created task {} with assigned partitions {}", taskId, partitions);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 9c80fd7..90f0aeb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -39,11 +39,13 @@ import java.util.Set;
* A StandbyTask
*/
public class StandbyTask extends AbstractTask implements Task {
- private final Logger log;
private final String logPrefix;
- private final Sensor closeTaskSensor;
- private final boolean eosEnabled;
+ private final Logger log;
private final InternalProcessorContext processorContext;
+ private final boolean eosEnabled;
+
+ // metrics
+ private final Sensor closeTaskSensor;
private Map<TopicPartition, Long> offsetSnapshotSinceLastCommit;
@@ -57,30 +59,31 @@ public class StandbyTask extends AbstractTask implements Task {
* @param stateDirectory the {@link StateDirectory} created by the thread
*/
StandbyTask(final TaskId id,
- final Set<TopicPartition> partitions,
final ProcessorTopology topology,
- final StreamsConfig config,
- final StreamsMetricsImpl metrics,
- final ProcessorStateManager stateMgr,
final StateDirectory stateDirectory,
+ final ProcessorStateManager stateMgr,
+ final Set<TopicPartition> partitions,
+ final StreamsConfig config,
+ final InternalProcessorContext processorContext,
final ThreadCache cache,
- final InternalProcessorContext processorContext) {
+ final StreamsMetricsImpl metrics) {
super(id, topology, stateDirectory, stateMgr, partitions);
- this.processorContext = processorContext;
- processorContext.transitionToStandby(cache);
+ // logging
final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
- logPrefix = threadIdPrefix + String.format("%s [%s] ", "standby-task", id);
+ logPrefix = threadIdPrefix + String.format("%s [%s] ", "standby-task", id.toString());
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(getClass());
- closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics);
+ // members
+ this.processorContext = processorContext;
+ processorContext.transitionToStandby(cache);
+
+ // config
eosEnabled = StreamThread.eosEnabled(config);
- }
- @Override
- public boolean isActive() {
- return false;
+ // metrics
+ closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics);
}
/**
@@ -127,43 +130,6 @@ public class StandbyTask extends AbstractTask implements Task {
log.trace("No-op resume with state {}", state());
}
- /**
- * 1. flush store
- * 2. write checkpoint file
- *
- * @throws StreamsException fatal error, should close the thread
- */
- @Override
- public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
- if (state() == State.RUNNING || state() == State.SUSPENDED) {
- stateMgr.flush();
- log.info("Task ready for committing");
- } else {
- throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing ");
- }
-
- return Collections.emptyMap();
- }
-
- @Override
- public void postCommit() {
- if (state() == State.RUNNING || state() == State.SUSPENDED) {
- // since there's no written offsets we can checkpoint with empty map,
- // and the state current offset would be used to checkpoint
- stateMgr.checkpoint(Collections.emptyMap());
- offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
- log.info("Finalized commit");
- } else {
- throw new IllegalStateException("Illegal state " + state() + " while post committing standby task " + id);
- }
- }
-
- @Override
- public void closeClean() {
- close(true);
- log.info("Closed clean");
- }
-
@Override
public void closeDirty() {
close(false);
@@ -171,20 +137,9 @@ public class StandbyTask extends AbstractTask implements Task {
}
@Override
- public void closeAndRecycleState() {
- suspend();
- prepareCommit();
-
- if (state() == State.CREATED || state() == State.SUSPENDED) {
- stateMgr.recycle();
- } else {
- throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id);
- }
-
- closeTaskSensor.record();
- transitionTo(State.CLOSED);
-
- log.info("Closed clean and recycled state");
+ public void closeClean() {
+ close(true);
+ log.info("Closed clean");
}
private void close(final boolean clean) {
@@ -225,14 +180,57 @@ public class StandbyTask extends AbstractTask implements Task {
}
@Override
+ public void closeAndRecycleState() {
+ suspend();
+ prepareCommit();
+
+ if (state() == State.CREATED || state() == State.SUSPENDED) {
+ stateMgr.recycle();
+ } else {
+ throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id);
+ }
+
+ closeTaskSensor.record();
+ transitionTo(State.CLOSED);
+
+ log.info("Closed clean and recycled state");
+ }
+
+ @Override
public boolean commitNeeded() {
// we can commit if the store's offset has changed since last commit
return offsetSnapshotSinceLastCommit == null || !offsetSnapshotSinceLastCommit.equals(stateMgr.changelogOffsets());
}
+ /**
+ * 1. flush store
+ * 2. write checkpoint file
+ *
+ * @throws StreamsException fatal error, should close the thread
+ */
@Override
- public Map<TopicPartition, Long> changelogOffsets() {
- return Collections.unmodifiableMap(stateMgr.changelogOffsets());
+ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+ if (state() == State.RUNNING || state() == State.SUSPENDED) {
+ stateMgr.flush();
+ log.info("Task ready for committing");
+ } else {
+ throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing ");
+ }
+
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void postCommit() {
+ if (state() == State.RUNNING || state() == State.SUSPENDED) {
+ // since there's no written offsets we can checkpoint with empty map,
+ // and the state current offset would be used to checkpoint
+ stateMgr.checkpoint(Collections.emptyMap());
+ offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
+ log.info("Finalized commit");
+ } else {
+ throw new IllegalStateException("Illegal state " + state() + " while post committing standby task " + id);
+ }
}
@Override
@@ -240,6 +238,16 @@ public class StandbyTask extends AbstractTask implements Task {
throw new IllegalStateException("Attempted to add records to task " + id() + " for invalid input partition " + partition);
}
+ @Override
+ public boolean isActive() {
+ return false;
+ }
+
+ @Override
+ public Map<TopicPartition, Long> changelogOffsets() {
+ return Collections.unmodifiableMap(stateMgr.changelogOffsets());
+ }
+
InternalProcessorContext processorContext() {
return processorContext;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index 443db8e..d7a6586 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -94,7 +94,7 @@ class StandbyTaskCreator {
dummyCache
);
- createdTasks.add(createStandbyTask(taskId, partitions, topology, stateManager, context));
+ createdTasks.add(createStandbyTask(taskId, topology, stateManager, partitions, context));
} else {
log.trace(
"Skipped standby task {} with assigned partitions {} " +
@@ -117,28 +117,28 @@ class StandbyTaskCreator {
return createStandbyTask(
streamTask.id(),
- partitions,
builder.buildSubtopology(streamTask.id.topicGroupId),
stateManager,
+ partitions,
context
);
}
StandbyTask createStandbyTask(final TaskId taskId,
- final Set<TopicPartition> partitions,
final ProcessorTopology topology,
final ProcessorStateManager stateManager,
+ final Set<TopicPartition> partitions,
final InternalProcessorContext context) {
final StandbyTask task = new StandbyTask(
taskId,
- partitions,
topology,
- config,
- streamsMetrics,
- stateManager,
stateDirectory,
+ stateManager,
+ partitions,
+ config,
+ context,
dummyCache,
- context
+ streamsMetrics
);
log.trace("Created task {} with assigned partitions {}", taskId, partitions);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 3925acc..147f5ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -71,26 +71,26 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
// visible for testing
static final byte LATEST_MAGIC_BYTE = 1;
- private final Time time;
- private final Logger log;
private final String logPrefix;
+ private final Logger log;
+
+ private final InternalProcessorContext processorContext;
+ private final Time time;
private final Consumer<byte[], byte[]> mainConsumer;
+ private final RecordCollector recordCollector;
+ private final PartitionGroup.RecordInfo recordInfo;
+ private final RecordQueueCreator recordQueueCreator;
+ private final PartitionGroup partitionGroup;
+ private final PunctuationQueue streamTimePunctuationQueue;
+ private final PunctuationQueue systemTimePunctuationQueue;
+ private final Map<TopicPartition, Long> consumedOffsets;
// we want to abstract eos logic out of StreamTask, however
// there's still an optimization that requires this info to be
// leaked into this class, which is to checkpoint after committing if EOS is not enabled.
private final boolean eosEnabled;
-
private final long maxTaskIdleMs;
private final int maxBufferedSize;
- private final PartitionGroup partitionGroup;
- private final RecordCollector recordCollector;
- private final PartitionGroup.RecordInfo recordInfo;
- private final Map<TopicPartition, Long> consumedOffsets;
- private final PunctuationQueue streamTimePunctuationQueue;
- private final PunctuationQueue systemTimePunctuationQueue;
-
- private long processTimeMs = 0L;
private final Sensor closeTaskSensor;
private final Sensor processRatioSensor;
@@ -99,56 +99,73 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private final Sensor bufferedRecordsSensor;
private final Sensor enforcedProcessingSensor;
private final Map<String, Sensor> e2eLatencySensors = new HashMap<>();
- private final InternalProcessorContext processorContext;
-
- private final RecordQueueCreator recordQueueCreator;
- private long idleStartTimeMs;
private boolean commitNeeded = false;
private boolean commitRequested = false;
-
+ private long idleStartTimeMs = RecordQueue.UNKNOWN;
+ private long processTimeMs = 0L;
private Map<TopicPartition, Long> checkpoint = null;
public StreamTask(final TaskId id,
- final Set<TopicPartition> partitions,
final ProcessorTopology topology,
- final Consumer<byte[], byte[]> mainConsumer,
- final StreamsConfig config,
- final StreamsMetricsImpl streamsMetrics,
final StateDirectory stateDirectory,
+ final ProcessorStateManager stateMgr,
+ final Set<TopicPartition> partitions,
+ final StreamsConfig config,
+ final InternalProcessorContext processorContext,
final ThreadCache cache,
+ final StreamsMetricsImpl streamsMetrics,
final Time time,
- final ProcessorStateManager stateMgr,
- final RecordCollector recordCollector,
- final InternalProcessorContext processorContext) {
+ final Consumer<byte[], byte[]> mainConsumer,
+ final RecordCollector recordCollector) {
super(id, topology, stateDirectory, stateMgr, partitions);
- this.mainConsumer = mainConsumer;
- this.processorContext = processorContext;
- processorContext.transitionToActive(this, recordCollector, cache);
+ final String taskId = id.toString();
+ final String threadId = Thread.currentThread().getName();
- final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
- logPrefix = threadIdPrefix + String.format("%s [%s] ", "task", id);
+ // logging
+ final String threadIdPrefix = String.format("stream-thread [%s] ", threadId);
+ logPrefix = threadIdPrefix + String.format("%s [%s] ", "task", taskId);
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(getClass());
+ // members
+ this.processorContext = processorContext;
+ processorContext.transitionToActive(this, recordCollector, cache);
this.time = time;
+ this.mainConsumer = mainConsumer;
this.recordCollector = recordCollector;
+
+ recordInfo = new PartitionGroup.RecordInfo();
+ recordQueueCreator = new RecordQueueCreator(logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler());
+ partitionGroup = new PartitionGroup(
+ createPartitionQueues(),
+ TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics)
+ );
+ streamTimePunctuationQueue = new PunctuationQueue();
+ systemTimePunctuationQueue = new PunctuationQueue();
+ consumedOffsets = new HashMap<>();
+
+ stateMgr.registerGlobalStateStores(topology.globalStateStores());
+
+ // config
eosEnabled = StreamThread.eosEnabled(config);
+ maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
+ maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
- final String threadId = Thread.currentThread().getName();
+ // metrics
closeTaskSensor = ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
- final String taskId = id.toString();
+ processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics);
+ processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
+ punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
+ bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
+
if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
final Sensor parent = ThreadMetrics.commitOverTasksSensor(threadId, streamsMetrics);
enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics, parent);
} else {
enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics);
}
- processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics);
- processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
- punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
- bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
for (final String terminalNode : topology.terminalNodes()) {
e2eLatencySensors.put(
@@ -164,22 +181,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, processorId, RecordingLevel.INFO, streamsMetrics)
);
}
-
- streamTimePunctuationQueue = new PunctuationQueue();
- systemTimePunctuationQueue = new PunctuationQueue();
- maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
- maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
-
- // initialize the consumed and committed offset cache
- consumedOffsets = new HashMap<>();
-
- recordQueueCreator = new RecordQueueCreator(logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler());
-
- recordInfo = new PartitionGroup.RecordInfo();
- partitionGroup = new PartitionGroup(createPartitionQueues(),
- TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics));
-
- stateMgr.registerGlobalStateStores(topology.globalStateStores());
}
// create queues for each assigned partition and associate them
@@ -192,11 +193,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
return partitionQueues;
}
- @Override
- public boolean isActive() {
- return true;
- }
-
/**
* @throws LockException could happen when multi-threads within the single instance, could retry
* @throws TimeoutException if initializing record collector timed out
@@ -246,6 +242,59 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
}
+ private void initializeMetadata() {
+ try {
+ final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = mainConsumer.committed(inputPartitions()).entrySet().stream()
+ .filter(e -> e.getValue() != null)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ initializeTaskTime(offsetsAndMetadata);
+ } catch (final TimeoutException e) {
+ log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
+ "\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
+ e.toString(),
+ ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+
+ throw e;
+ } catch (final KafkaException e) {
+ throw new StreamsException(String.format("task [%s] Failed to initialize offsets for %s", id, inputPartitions()), e);
+ }
+ }
+
+ private void initializeTaskTime(final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata) {
+ for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetsAndMetadata.entrySet()) {
+ final TopicPartition partition = entry.getKey();
+ final OffsetAndMetadata metadata = entry.getValue();
+
+ if (metadata != null) {
+ final long committedTimestamp = decodeTimestamp(metadata.metadata());
+ partitionGroup.setPartitionTime(partition, committedTimestamp);
+ log.debug("A committed timestamp was detected: setting the partition time of partition {}"
+ + " to {} in stream task {}", partition, committedTimestamp, id);
+ } else {
+ log.debug("No committed timestamp was found in metadata for partition {}", partition);
+ }
+ }
+
+ final Set<TopicPartition> nonCommitted = new HashSet<>(inputPartitions());
+ nonCommitted.removeAll(offsetsAndMetadata.keySet());
+ for (final TopicPartition partition : nonCommitted) {
+ log.debug("No committed offset for partition {}, therefore no timestamp can be found for this partition", partition);
+ }
+ }
+
+ private void initializeTopology() {
+ // initialize the task by initializing all its processor nodes in the topology
+ log.trace("Initializing processor nodes of the topology");
+ for (final ProcessorNode<?, ?> node : topology.processors()) {
+ processorContext.setCurrentNode(node);
+ try {
+ node.init(processorContext);
+ } finally {
+ processorContext.setCurrentNode(null);
+ }
+ }
+ }
+
@Override
public void suspend() {
switch (state()) {
@@ -333,296 +382,119 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
}
+ @Override
+ public void closeDirty() {
+ close(false);
+ log.info("Closed dirty");
+ }
+
+ @Override
+ public void closeClean() {
+ close(true);
+ log.info("Closed clean");
+ }
+
/**
- * @return offsets that should be committed for this task
+ * the following order must be followed:
+ * 1. checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed
+ * 2. then if we are closing on EOS and dirty, wipe out the state store directory
+ * 3. finally release the state manager lock
*/
- @Override
- public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+ private void close(final boolean clean) {
+ if (clean) {
+ executeAndMaybeSwallow(true, this::writeCheckpointIfNeed, "state manager checkpoint", log);
+ }
+
switch (state()) {
- case RUNNING:
+ case CREATED:
case RESTORING:
case SUSPENDED:
- maybeScheduleCheckpoint();
- stateMgr.flush();
- recordCollector.flush();
+ // first close state manager (which is idempotent) then close the record collector
+ // if the latter throws and we re-close dirty which would close the state manager again.
+ executeAndMaybeSwallow(
+ clean,
+ () -> StateManagerUtil.closeStateManager(
+ log,
+ logPrefix,
+ clean,
+ eosEnabled,
+ stateMgr,
+ stateDirectory,
+ TaskType.ACTIVE
+ ),
+ "state manager close",
+ log);
- log.debug("Prepared task for committing");
+ executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log);
break;
- case CREATED:
case CLOSED:
- throw new IllegalStateException("Illegal state " + state() + " while preparing active task " + id + " for committing");
+ log.trace("Skip closing since state is {}", state());
+ return;
+
+ case RUNNING:
+ throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id);
default:
- throw new IllegalStateException("Unknown state " + state() + " while preparing active task " + id + " for committing");
+ throw new IllegalStateException("Unknown state " + state() + " while closing active task " + id);
}
- return committableOffsetsAndMetadata();
+ partitionGroup.clear();
+ closeTaskSensor.record();
+
+ transitionTo(State.CLOSED);
}
- private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
- final Map<TopicPartition, OffsetAndMetadata> committableOffsets;
+ @Override
+ public void closeAndRecycleState() {
+ suspend();
+ prepareCommit();
+ writeCheckpointIfNeed();
switch (state()) {
case CREATED:
- case RESTORING:
- committableOffsets = Collections.emptyMap();
-
- break;
-
- case RUNNING:
case SUSPENDED:
- final Map<TopicPartition, Long> partitionTimes = extractPartitionTimes();
-
- committableOffsets = new HashMap<>(consumedOffsets.size());
- for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
- final TopicPartition partition = entry.getKey();
- Long offset = partitionGroup.headRecordOffset(partition);
- if (offset == null) {
- try {
- offset = mainConsumer.position(partition);
- } catch (final TimeoutException error) {
- // the `consumer.position()` call should never block, because we know that we did process data
- // for the requested partition and thus the consumer should have a valid local position
- // that it can return immediately
-
- // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
- throw new IllegalStateException(error);
- } catch (final KafkaException fatal) {
- throw new StreamsException(fatal);
- }
- }
- final long partitionTime = partitionTimes.get(partition);
- committableOffsets.put(partition, new OffsetAndMetadata(offset, encodeTimestamp(partitionTime)));
- }
+ stateMgr.recycle();
+ recordCollector.close();
break;
+ case RESTORING: // we should have transitioned to `SUSPENDED` already
+ case RUNNING: // we should have transitioned to `SUSPENDED` already
case CLOSED:
- throw new IllegalStateException("Illegal state " + state() + " while getting commitable offsets for active task " + id);
-
+ throw new IllegalStateException("Illegal state " + state() + " while recycling active task " + id);
default:
- throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
+ throw new IllegalStateException("Unknown state " + state() + " while recycling active task " + id);
}
- return committableOffsets;
- }
-
- @Override
- public void postCommit() {
- commitRequested = false;
- commitNeeded = false;
+ partitionGroup.clear();
+ closeTaskSensor.record();
- switch (state()) {
- case RESTORING:
- writeCheckpointIfNeed();
+ transitionTo(State.CLOSED);
- break;
-
- case RUNNING:
- if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
- writeCheckpointIfNeed();
- }
-
- break;
-
- case SUSPENDED:
- writeCheckpointIfNeed();
- // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
- // because otherwise we loose the partition-time information
- partitionGroup.clear();
-
- break;
-
- case CREATED:
- case CLOSED:
- throw new IllegalStateException("Illegal state " + state() + " while post committing active task " + id);
-
- default:
- throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
- }
-
- log.debug("Committed");
- }
-
- private Map<TopicPartition, Long> extractPartitionTimes() {
- final Map<TopicPartition, Long> partitionTimes = new HashMap<>();
- for (final TopicPartition partition : partitionGroup.partitions()) {
- partitionTimes.put(partition, partitionGroup.partitionTimestamp(partition));
- }
- return partitionTimes;
- }
-
- @Override
- public void closeClean() {
- close(true);
- log.info("Closed clean");
- }
-
- @Override
- public void closeDirty() {
- close(false);
- log.info("Closed dirty");
- }
-
- @Override
- public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
- super.update(topicPartitions, nodeToSourceTopics);
- partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue);
- }
-
- @Override
- public void closeAndRecycleState() {
- suspend();
- prepareCommit();
- writeCheckpointIfNeed();
-
- switch (state()) {
- case CREATED:
- case SUSPENDED:
- stateMgr.recycle();
- recordCollector.close();
-
- break;
-
- case RESTORING: // we should have transitioned to `SUSPENDED` already
- case RUNNING: // we should have transitioned to `SUSPENDED` already
- case CLOSED:
- throw new IllegalStateException("Illegal state " + state() + " while recycling active task " + id);
- default:
- throw new IllegalStateException("Unknown state " + state() + " while recycling active task " + id);
- }
-
- partitionGroup.clear();
- closeTaskSensor.record();
-
- transitionTo(State.CLOSED);
-
- log.info("Closed clean and recycled state");
- }
-
- private void maybeScheduleCheckpoint() {
- switch (state()) {
- case RESTORING:
- case SUSPENDED:
- this.checkpoint = checkpointableOffsets();
-
- break;
-
- case RUNNING:
- if (!eosEnabled) {
- this.checkpoint = checkpointableOffsets();
- }
-
- break;
-
- case CREATED:
- case CLOSED:
- throw new IllegalStateException("Illegal state " + state() + " while scheduling checkpoint for active task " + id);
-
- default:
- throw new IllegalStateException("Unknown state " + state() + " while scheduling checkpoint for active task " + id);
- }
- }
-
- private void writeCheckpointIfNeed() {
- if (commitNeeded) {
- throw new IllegalStateException("A checkpoint should only be written if no commit is needed.");
- }
- if (checkpoint != null) {
- stateMgr.checkpoint(checkpoint);
- checkpoint = null;
- }
- }
+ log.info("Closed clean and recycled state");
+ }
/**
- * <pre>
- * the following order must be followed:
- * 1. checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed
- * 2. then if we are closing on EOS and dirty, wipe out the state store directory
- * 3. finally release the state manager lock
- * </pre>
- */
- private void close(final boolean clean) {
- if (clean) {
- executeAndMaybeSwallow(true, this::writeCheckpointIfNeed, "state manager checkpoint", log);
- }
-
- switch (state()) {
- case CREATED:
- case RESTORING:
- case SUSPENDED:
- // first close state manager (which is idempotent) then close the record collector
- // if the latter throws and we re-close dirty which would close the state manager again.
- executeAndMaybeSwallow(
- clean,
- () -> StateManagerUtil.closeStateManager(
- log,
- logPrefix,
- clean,
- eosEnabled,
- stateMgr,
- stateDirectory,
- TaskType.ACTIVE
- ),
- "state manager close",
- log);
-
- executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log);
-
- break;
-
- case CLOSED:
- log.trace("Skip closing since state is {}", state());
- return;
-
- case RUNNING:
- throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id);
-
- default:
- throw new IllegalStateException("Unknown state " + state() + " while closing active task " + id);
- }
-
- partitionGroup.clear();
- closeTaskSensor.record();
-
- transitionTo(State.CLOSED);
- }
-
- /**
- * An active task is processable if its buffer contains data for all of its input
- * source topic partitions, or if it is enforced to be processable
+ * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped
+ * and not added to the queue for processing
+ *
+ * @param partition the partition
+ * @param records the records
*/
- public boolean isProcessable(final long wallClockTime) {
- if (state() == State.CLOSED) {
- // a task is only closing / closed when 1) task manager is closing, 2) a rebalance is undergoing;
- // in either case we can just log it and move on without notifying the thread since the consumer
- // would soon be updated to not return any records for this task anymore.
- log.info("Stream task {} is already in {} state, skip processing it.", id(), state());
+ @Override
+ public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
+ final int newQueueSize = partitionGroup.addRawRecords(partition, records);
- return false;
+ if (log.isTraceEnabled()) {
+ log.trace("Added records into the buffered queue of partition {}, new queue size is {}", partition, newQueueSize);
}
- if (partitionGroup.allPartitionsBuffered()) {
- idleStartTimeMs = RecordQueue.UNKNOWN;
- return true;
- } else if (partitionGroup.numBuffered() > 0) {
- if (idleStartTimeMs == RecordQueue.UNKNOWN) {
- idleStartTimeMs = wallClockTime;
- }
-
- if (wallClockTime - idleStartTimeMs >= maxTaskIdleMs) {
- enforcedProcessingSensor.record(1.0d, wallClockTime);
- return true;
- } else {
- return false;
- }
- } else {
- // there's no data in any of the topics; we should reset the enforced
- // processing timer
- idleStartTimeMs = RecordQueue.UNKNOWN;
- return false;
+ // if after adding these records, its partition queue's buffered size has been
+ // increased beyond the threshold, we can then pause the consumption for this partition
+ if (newQueueSize > maxBufferedSize) {
+ mainConsumer.pause(singleton(partition));
}
}
@@ -673,13 +545,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
} catch (final RuntimeException e) {
final String stackTrace = getStacktraceString(e);
throw new StreamsException(String.format("Exception caught in process. taskId=%s, " +
- "processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s",
- id(),
- processorContext.currentNode().name(),
- record.topic(),
- record.partition(),
- record.offset(),
- stackTrace
+ "processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s",
+ id(),
+ processorContext.currentNode().name(),
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ stackTrace
), e);
} finally {
processorContext.setCurrentNode(null);
@@ -688,6 +560,18 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
return true;
}
+ private String getStacktraceString(final RuntimeException e) {
+ String stacktrace = null;
+ try (final StringWriter stringWriter = new StringWriter();
+ final PrintWriter printWriter = new PrintWriter(stringWriter)) {
+ e.printStackTrace(printWriter);
+ stacktrace = stringWriter.toString();
+ } catch (final IOException ioe) {
+ log.error("Encountered error extracting stacktrace from this exception", ioe);
+ }
+ return stacktrace;
+ }
+
@Override
public void recordProcessBatchTime(final long processBatchTime) {
processTimeMs += processBatchTime;
@@ -700,16 +584,50 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
processTimeMs = 0L;
}
- private String getStacktraceString(final RuntimeException e) {
- String stacktrace = null;
- try (final StringWriter stringWriter = new StringWriter();
- final PrintWriter printWriter = new PrintWriter(stringWriter)) {
- e.printStackTrace(printWriter);
- stacktrace = stringWriter.toString();
- } catch (final IOException ioe) {
- log.error("Encountered error extracting stacktrace from this exception", ioe);
+ /**
+ * Possibly trigger registered stream-time punctuation functions if
+ * current partition group timestamp has reached the defined stamp
+ * Note, this is only called in the presence of new records
+ *
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
+ */
+ @Override
+ public boolean maybePunctuateStreamTime() {
+ final long streamTime = partitionGroup.streamTime();
+
+ // if the timestamp is not known yet, meaning there is not enough data accumulated
+ // to reason stream partition time, then skip.
+ if (streamTime == RecordQueue.UNKNOWN) {
+ return false;
+ } else {
+ final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this);
+
+ if (punctuated) {
+ commitNeeded = true;
+ }
+
+ return punctuated;
}
- return stacktrace;
+ }
+
+ /**
+ * Possibly trigger registered system-time punctuation functions if
+ * current system timestamp has reached the defined stamp
+ * Note, this is called irrespective of the presence of new records
+ *
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
+ */
+ @Override
+ public boolean maybePunctuateSystemTime() {
+ final long systemTime = time.milliseconds();
+
+ final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(systemTime, PunctuationType.WALL_CLOCK_TIME, this);
+
+ if (punctuated) {
+ commitNeeded = true;
+ }
+
+ return punctuated;
}
/**
@@ -742,16 +660,69 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
}
- private void updateProcessorContext(final StampedRecord record, final ProcessorNode<?, ?> currNode, final long wallClockTime) {
- processorContext.setRecordContext(
- new ProcessorRecordContext(
- record.timestamp,
- record.offset(),
- record.partition(),
- record.topic(),
- record.headers()));
- processorContext.setCurrentNode(currNode);
- processorContext.setSystemTimeMs(wallClockTime);
+ @Override
+ public boolean commitNeeded() {
+ return commitNeeded;
+ }
+
+ /**
+ * Whether or not a request has been made to commit the current state
+ */
+ @Override
+ public boolean commitRequested() {
+ return commitRequested;
+ }
+
+ /**
+ * @return offsets that should be committed for this task
+ */
+ @Override
+ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+ switch (state()) {
+ case RUNNING:
+ case RESTORING:
+ case SUSPENDED:
+ maybeScheduleCheckpoint();
+ stateMgr.flush();
+ recordCollector.flush();
+
+ log.debug("Prepared task for committing");
+
+ break;
+
+ case CREATED:
+ case CLOSED:
+ throw new IllegalStateException("Illegal state " + state() + " while preparing active task " + id + " for committing");
+
+ default:
+ throw new IllegalStateException("Unknown state " + state() + " while preparing active task " + id + " for committing");
+ }
+
+ return committableOffsetsAndMetadata();
+ }
+
+ private void maybeScheduleCheckpoint() {
+ switch (state()) {
+ case RESTORING:
+ case SUSPENDED:
+ this.checkpoint = checkpointableOffsets();
+
+ break;
+
+ case RUNNING:
+ if (!eosEnabled) {
+ this.checkpoint = checkpointableOffsets();
+ }
+
+ break;
+
+ case CREATED:
+ case CLOSED:
+ throw new IllegalStateException("Illegal state " + state() + " while scheduling checkpoint for active task " + id);
+
+ default:
+ throw new IllegalStateException("Unknown state " + state() + " while scheduling checkpoint for active task " + id);
+ }
}
/**
@@ -763,47 +734,100 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
}
- return checkpointableOffsets;
+ return checkpointableOffsets;
+ }
+
+ private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
+ final Map<TopicPartition, OffsetAndMetadata> committableOffsets;
+
+ switch (state()) {
+ case CREATED:
+ case RESTORING:
+ committableOffsets = Collections.emptyMap();
+
+ break;
+
+ case RUNNING:
+ case SUSPENDED:
+ final Map<TopicPartition, Long> partitionTimes = extractPartitionTimes();
+
+ committableOffsets = new HashMap<>(consumedOffsets.size());
+ for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
+ final TopicPartition partition = entry.getKey();
+ Long offset = partitionGroup.headRecordOffset(partition);
+ if (offset == null) {
+ try {
+ offset = mainConsumer.position(partition);
+ } catch (final TimeoutException error) {
+ // the `consumer.position()` call should never block, because we know that we did process data
+ // for the requested partition and thus the consumer should have a valid local position
+ // that it can return immediately
+
+ // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
+ throw new IllegalStateException(error);
+ } catch (final KafkaException fatal) {
+ throw new StreamsException(fatal);
+ }
+ }
+ final long partitionTime = partitionTimes.get(partition);
+ committableOffsets.put(partition, new OffsetAndMetadata(offset, encodeTimestamp(partitionTime)));
+ }
+
+ break;
+
+ case CLOSED:
+ throw new IllegalStateException("Illegal state " + state() + " while getting commitable offsets for active task " + id);
+
+ default:
+ throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
+ }
+
+ return committableOffsets;
+ }
+
+ private Map<TopicPartition, Long> extractPartitionTimes() {
+ final Map<TopicPartition, Long> partitionTimes = new HashMap<>();
+ for (final TopicPartition partition : partitionGroup.partitions()) {
+ partitionTimes.put(partition, partitionGroup.partitionTimestamp(partition));
+ }
+ return partitionTimes;
}
- private void initializeMetadata() {
- try {
- final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = mainConsumer.committed(inputPartitions()).entrySet().stream()
- .filter(e -> e.getValue() != null)
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- initializeTaskTime(offsetsAndMetadata);
- } catch (final TimeoutException e) {
- log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
- "\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
- e.toString(),
- ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+ @Override
+ public void postCommit() {
+ commitRequested = false;
+ commitNeeded = false;
- throw e;
- } catch (final KafkaException e) {
- throw new StreamsException(String.format("task [%s] Failed to initialize offsets for %s", id, inputPartitions()), e);
- }
- }
+ switch (state()) {
+ case RESTORING:
+ writeCheckpointIfNeed();
- private void initializeTaskTime(final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata) {
- for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetsAndMetadata.entrySet()) {
- final TopicPartition partition = entry.getKey();
- final OffsetAndMetadata metadata = entry.getValue();
+ break;
- if (metadata != null) {
- final long committedTimestamp = decodeTimestamp(metadata.metadata());
- partitionGroup.setPartitionTime(partition, committedTimestamp);
- log.debug("A committed timestamp was detected: setting the partition time of partition {}"
- + " to {} in stream task {}", partition, committedTimestamp, id);
- } else {
- log.debug("No committed timestamp was found in metadata for partition {}", partition);
- }
- }
+ case RUNNING:
+ if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
+ writeCheckpointIfNeed();
+ }
- final Set<TopicPartition> nonCommitted = new HashSet<>(inputPartitions());
- nonCommitted.removeAll(offsetsAndMetadata.keySet());
- for (final TopicPartition partition : nonCommitted) {
- log.debug("No committed offset for partition {}, therefore no timestamp can be found for this partition", partition);
+ break;
+
+ case SUSPENDED:
+ writeCheckpointIfNeed();
+ // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing,
+ // because otherwise we loose the partition-time information
+ partitionGroup.clear();
+
+ break;
+
+ case CREATED:
+ case CLOSED:
+ throw new IllegalStateException("Illegal state " + state() + " while post committing active task " + id);
+
+ default:
+ throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
}
+
+ log.debug("Committed");
}
@Override
@@ -819,38 +843,72 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
return purgeableConsumedOffsets;
}
- private void initializeTopology() {
- // initialize the task by initializing all its processor nodes in the topology
- log.trace("Initializing processor nodes of the topology");
- for (final ProcessorNode<?, ?> node : topology.processors()) {
- processorContext.setCurrentNode(node);
- try {
- node.init(processorContext);
- } finally {
- processorContext.setCurrentNode(null);
- }
+ @Override
+ public boolean isActive() {
+ return true;
+ }
+
+ @Override
+ public void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
+ super.updateInputPartitions(topicPartitions, nodeToSourceTopics);
+ partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue);
+ }
+
+ @Override
+ public Map<TopicPartition, Long> changelogOffsets() {
+ if (state() == State.RUNNING) {
+ // if we are in running state, just return the latest offset sentinel indicating
+ // we should be at the end of the changelog
+ return changelogPartitions().stream()
+ .collect(Collectors.toMap(Function.identity(), tp -> Task.LATEST_OFFSET));
+ } else {
+ return Collections.unmodifiableMap(stateMgr.changelogOffsets());
+ }
+ }
+
+ private void writeCheckpointIfNeed() {
+ if (commitNeeded) {
+ throw new IllegalStateException("A checkpoint should only be written if no commit is needed.");
+ }
+ if (checkpoint != null) {
+ stateMgr.checkpoint(checkpoint);
+ checkpoint = null;
}
}
/**
- * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped
- * and not added to the queue for processing
- *
- * @param partition the partition
- * @param records the records
+ * An active task is processable if its buffer contains data for all of its input
+ * source topic partitions, or if it is enforced to be processable
*/
- @Override
- public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
- final int newQueueSize = partitionGroup.addRawRecords(partition, records);
+ public boolean isProcessable(final long wallClockTime) {
+ if (state() == State.CLOSED) {
+ // a task is only closing / closed when 1) task manager is closing, 2) a rebalance is undergoing;
+ // in either case we can just log it and move on without notifying the thread since the consumer
+ // would soon be updated to not return any records for this task anymore.
+ log.info("Stream task {} is already in {} state, skip processing it.", id(), state());
- if (log.isTraceEnabled()) {
- log.trace("Added records into the buffered queue of partition {}, new queue size is {}", partition, newQueueSize);
+ return false;
}
- // if after adding these records, its partition queue's buffered size has been
- // increased beyond the threshold, we can then pause the consumption for this partition
- if (newQueueSize > maxBufferedSize) {
- mainConsumer.pause(singleton(partition));
+ if (partitionGroup.allPartitionsBuffered()) {
+ idleStartTimeMs = RecordQueue.UNKNOWN;
+ return true;
+ } else if (partitionGroup.numBuffered() > 0) {
+ if (idleStartTimeMs == RecordQueue.UNKNOWN) {
+ idleStartTimeMs = wallClockTime;
+ }
+
+ if (wallClockTime - idleStartTimeMs >= maxTaskIdleMs) {
+ enforcedProcessingSensor.record(1.0d, wallClockTime);
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ // there's no data in any of the topics; we should reset the enforced
+ // processing timer
+ idleStartTimeMs = RecordQueue.UNKNOWN;
+ return false;
}
}
@@ -902,48 +960,23 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
}
- /**
- * Possibly trigger registered stream-time punctuation functions if
- * current partition group timestamp has reached the defined stamp
- * Note, this is only called in the presence of new records
- *
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
- */
- public boolean maybePunctuateStreamTime() {
- final long streamTime = partitionGroup.streamTime();
-
- // if the timestamp is not known yet, meaning there is not enough data accumulated
- // to reason stream partition time, then skip.
- if (streamTime == RecordQueue.UNKNOWN) {
- return false;
- } else {
- final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this);
-
- if (punctuated) {
- commitNeeded = true;
- }
-
- return punctuated;
- }
+ private void updateProcessorContext(final StampedRecord record, final ProcessorNode<?, ?> currNode, final long wallClockTime) {
+ processorContext.setRecordContext(
+ new ProcessorRecordContext(
+ record.timestamp,
+ record.offset(),
+ record.partition(),
+ record.topic(),
+ record.headers()));
+ processorContext.setCurrentNode(currNode);
+ processorContext.setSystemTimeMs(wallClockTime);
}
/**
- * Possibly trigger registered system-time punctuation functions if
- * current system timestamp has reached the defined stamp
- * Note, this is called irrespective of the presence of new records
- *
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
+ * Request committing the current task's state
*/
- public boolean maybePunctuateSystemTime() {
- final long systemTime = time.milliseconds();
-
- final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(systemTime, PunctuationType.WALL_CLOCK_TIME, this);
-
- if (punctuated) {
- commitNeeded = true;
- }
-
- return punctuated;
+ void requestCommit() {
+ commitRequested = true;
}
void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
@@ -959,21 +992,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
}
- /**
- * Request committing the current task's state
- */
- void requestCommit() {
- commitRequested = true;
+ public InternalProcessorContext processorContext() {
+ return processorContext;
}
- /**
- * Whether or not a request has been made to commit the current state
- */
- @Override
- public boolean commitRequested() {
- return commitRequested;
+ public boolean hasRecordsQueued() {
+ return numBuffered() > 0;
}
+ // visible for testing
static String encodeTimestamp(final long partitionTime) {
final ByteBuffer buffer = ByteBuffer.allocate(9);
buffer.put(LATEST_MAGIC_BYTE);
@@ -981,6 +1008,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
return Base64.getEncoder().encodeToString(buffer.array());
}
+ // visible for testing
long decodeTimestamp(final String encryptedString) {
if (encryptedString.isEmpty()) {
return RecordQueue.UNKNOWN;
@@ -992,15 +1020,29 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
return buffer.getLong();
default:
log.warn("Unsupported offset metadata version found. Supported version {}. Found version {}.",
- LATEST_MAGIC_BYTE, version);
+ LATEST_MAGIC_BYTE, version);
return RecordQueue.UNKNOWN;
}
}
- public InternalProcessorContext processorContext() {
- return processorContext;
+
+
+ // for testing only
+
+ RecordCollector recordCollector() {
+ return recordCollector;
+ }
+
+ int numBuffered() {
+ return partitionGroup.numBuffered();
+ }
+
+ long streamTime() {
+ return partitionGroup.streamTime();
}
+
+
/**
* Produces a string representation containing useful information about a Task.
* This is useful in debugging scenarios.
@@ -1043,40 +1085,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
return sb.toString();
}
- @Override
- public boolean commitNeeded() {
- return commitNeeded;
- }
-
- @Override
- public Map<TopicPartition, Long> changelogOffsets() {
- if (state() == State.RUNNING) {
- // if we are in running state, just return the latest offset sentinel indicating
- // we should be at the end of the changelog
- return changelogPartitions().stream()
- .collect(Collectors.toMap(Function.identity(), tp -> Task.LATEST_OFFSET));
- } else {
- return Collections.unmodifiableMap(stateMgr.changelogOffsets());
- }
- }
-
- public boolean hasRecordsQueued() {
- return numBuffered() > 0;
- }
-
- RecordCollector recordCollector() {
- return recordCollector;
- }
-
- // below are visible for testing only
- int numBuffered() {
- return partitionGroup.numBuffered();
- }
-
- long streamTime() {
- return partitionGroup.streamTime();
- }
-
private class RecordQueueCreator {
private final LogContext logContext;
private final TimestampExtractor defaultTimestampExtractor;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 62332c7..2032c66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -29,6 +28,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -97,13 +97,9 @@ public interface Task {
}
}
- TaskId id();
- State state();
- boolean isActive();
-
- boolean isClosed();
+ // idempotent life-cycle methods
/**
* @throws LockException could happen when multi-threads within the single instance, could retry
@@ -116,89 +112,100 @@ public interface Task {
*/
void completeRestoration();
- void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
-
- boolean commitNeeded();
-
- /**
- * @throws StreamsException fatal error, should close the thread
- */
- Map<TopicPartition, OffsetAndMetadata> prepareCommit();
-
- void postCommit();
-
void suspend();
/**
- *
* @throws StreamsException fatal error, should close the thread
*/
void resume();
- /**
- * Must be idempotent.
- */
+ void closeDirty();
+
void closeClean();
- /**
- * Must be idempotent.
- */
- void closeDirty();
+
+ // non-idempotent life-cycle methods
/**
- * Updates input partitions and topology after rebalance
+ * Revive a closed task to a created one; should never throw an exception
*/
- void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+ void revive();
/**
* Attempt a clean close but do not close the underlying state
*/
void closeAndRecycleState();
- /**
- * Revive a closed task to a created one; should never throw an exception
- */
- void revive();
-
- StateStore getStore(final String name);
-
- Set<TopicPartition> inputPartitions();
+ void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
- /**
- * @return any changelog partitions associated with this task
- */
- Collection<TopicPartition> changelogPartitions();
- /**
- * @return the offsets of all the changelog partitions associated with this task,
- * indicating the current positions of the logged state stores of the task.
- */
- Map<TopicPartition, Long> changelogOffsets();
+ // runtime methods (using in RUNNING state)
- void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
+ void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
- default Map<TopicPartition, Long> purgeableOffsets() {
- return Collections.emptyMap();
+ default boolean process(final long wallClockTime) {
+ return false;
}
default void recordProcessBatchTime(final long processBatchTime) {}
default void recordProcessTimeRatioAndBufferSize(final long allTaskProcessMs, final long now) {}
- default boolean process(final long wallClockTime) {
+ default boolean maybePunctuateStreamTime() {
return false;
}
- default boolean commitRequested() {
+ default boolean maybePunctuateSystemTime() {
return false;
}
- default boolean maybePunctuateStreamTime() {
+ boolean commitNeeded();
+
+ default boolean commitRequested() {
return false;
}
- default boolean maybePunctuateSystemTime() {
- return false;
+ /**
+ * @throws StreamsException fatal error, should close the thread
+ */
+ Map<TopicPartition, OffsetAndMetadata> prepareCommit();
+
+ void postCommit();
+
+ default Map<TopicPartition, Long> purgeableOffsets() {
+ return Collections.emptyMap();
}
+
+ // task status inquiry
+
+ TaskId id();
+
+ State state();
+
+ boolean isActive();
+
+ /**
+ * Updates input partitions after a rebalance
+ */
+ void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+
+ Set<TopicPartition> inputPartitions();
+
+ /**
+ * @return any changelog partitions associated with this task
+ */
+ Collection<TopicPartition> changelogPartitions();
+
+
+ // IQ related methods
+
+ StateStore getStore(final String name);
+
+ /**
+ * @return the offsets of all the changelog partitions associated with this task,
+ * indicating the current positions of the logged state stores of the task.
+ */
+ Map<TopicPartition, Long> changelogOffsets();
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index fdbaf1c..4d2948a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -359,7 +359,7 @@ public class TaskManager {
for (final TopicPartition topicPartition : topicPartitions) {
partitionToTask.put(topicPartition, task);
}
- task.update(topicPartitions, builder.nodeToSourceTopics());
+ task.updateInputPartitions(topicPartitions, builder.nodeToSourceTopics());
}
task.resume();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 8784cf1..667d16b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -479,14 +479,15 @@ public class StandbyTaskTest {
return new StandbyTask(
taskId,
- Collections.singleton(partition),
topology,
- config,
- streamsMetrics,
- stateManager,
stateDirectory,
+ stateManager,
+ Collections.singleton(partition),
+ config,
+ context,
cache,
- context);
+ streamsMetrics
+ );
}
private MetricName setupCloseTaskMetric() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 8f8bb54..030b630 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.util.HashSet;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
@@ -74,6 +73,7 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -1452,17 +1452,18 @@ public class StreamTaskTest {
task = new StreamTask(
taskId,
- mkSet(partition1, repartition),
topology,
- consumer,
- config,
- streamsMetrics,
stateDirectory,
+ stateManager,
+ mkSet(partition1, repartition),
+ config,
+ context,
cache,
+ streamsMetrics,
time,
- stateManager,
- recordCollector,
- context);
+ consumer,
+ recordCollector
+ );
task.initializeIfNeeded();
task.completeRestoration();
@@ -1777,7 +1778,7 @@ public class StreamTaskTest {
final Set<TopicPartition> newPartitions = new HashSet<>(task.inputPartitions());
newPartitions.add(new TopicPartition("newTopic", 0));
- task.update(newPartitions, mkMap(
+ task.updateInputPartitions(newPartitions, mkMap(
mkEntry(source1.name(), asList(topic1, "newTopic")),
mkEntry(source2.name(), singletonList(topic2)))
);
@@ -1828,17 +1829,18 @@ public class StreamTaskTest {
return new StreamTask(
taskId,
- mkSet(partition1),
topology,
- consumer,
- config,
- streamsMetrics,
stateDirectory,
+ stateManager,
+ mkSet(partition1),
+ config,
+ context,
cache,
+ streamsMetrics,
time,
- stateManager,
- recordCollector,
- context);
+ consumer,
+ recordCollector
+ );
}
private StreamTask createDisconnectedTask(final StreamsConfig config) {
@@ -1867,17 +1869,18 @@ public class StreamTaskTest {
return new StreamTask(
taskId,
- partitions,
topology,
- consumer,
- config,
- streamsMetrics,
stateDirectory,
+ stateManager,
+ partitions,
+ config,
+ context,
cache,
+ streamsMetrics,
time,
- stateManager,
- recordCollector,
- context);
+ consumer,
+ recordCollector
+ );
}
private StreamTask createFaultyStatefulTask(final StreamsConfig config) {
@@ -1897,17 +1900,18 @@ public class StreamTaskTest {
return new StreamTask(
taskId,
- partitions,
topology,
- consumer,
- config,
- streamsMetrics,
stateDirectory,
+ stateManager,
+ partitions,
+ config,
+ context,
cache,
+ streamsMetrics,
time,
- stateManager,
- recordCollector,
- context);
+ consumer,
+ recordCollector
+ );
}
private StreamTask createStatefulTask(final StreamsConfig config, final boolean logged) {
@@ -1933,17 +1937,18 @@ public class StreamTaskTest {
return new StreamTask(
taskId,
- partitions,
topology,
- consumer,
- config,
- streamsMetrics,
stateDirectory,
+ stateManager,
+ partitions,
+ config,
+ context,
cache,
+ streamsMetrics,
time,
- stateManager,
- recordCollector,
- context);
+ consumer,
+ recordCollector
+ );
}
private StreamTask createStatelessTask(final StreamsConfig config,
@@ -1972,17 +1977,18 @@ public class StreamTaskTest {
return new StreamTask(
taskId,
- partitions,
topology,
- consumer,
- config,
- new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion),
stateDirectory,
+ stateManager,
+ partitions,
+ config,
+ context,
cache,
+ new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion),
time,
- stateManager,
- recordCollector,
- context);
+ consumer,
+ recordCollector
+ );
}
private ConsumerRecord<byte[], byte[]> getConsumerRecord(final TopicPartition topicPartition, final long offset) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 8be3c21..0c01b5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -2706,7 +2706,8 @@ public class TaskManagerTest {
}
@Override
- public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
+ public void updateInputPartitions(final Set<TopicPartition> topicPartitions,
+ final Map<String, List<String>> nodeToSourceTopics) {
inputPartitions = topicPartitions;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index a171a46..81ba22d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -390,17 +390,18 @@ public class StreamThreadStateStoreProviderTest {
);
return new StreamTask(
taskId,
- partitions,
topology,
- clientSupplier.consumer,
- streamsConfig,
- streamsMetrics,
stateDirectory,
+ stateManager,
+ partitions,
+ streamsConfig,
+ context,
EasyMock.createNiceMock(ThreadCache.class),
+ streamsMetrics,
new MockTime(),
- stateManager,
- recordCollector,
- context);
+ clientSupplier.consumer,
+ recordCollector
+ );
}
private void mockThread(final boolean initialized) {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 2ca187b..82d892f 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -501,17 +501,17 @@ public class TopologyTestDriver implements Closeable {
task = new StreamTask(
TASK_ID,
- new HashSet<>(partitionsByInputTopic.values()),
processorTopology,
- consumer,
- streamsConfig,
- streamsMetrics,
stateDirectory,
+ stateManager,
+ new HashSet<>(partitionsByInputTopic.values()),
+ streamsConfig,
+ context,
cache,
+ streamsMetrics,
mockWallClockTime,
- stateManager,
- recordCollector,
- context
+ consumer,
+ recordCollector
);
task.initializeIfNeeded();
task.completeRestoration();