You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gh...@apache.org on 2024/02/05 18:56:05 UTC
(kafka) branch trunk updated: KAFKA-14683: Migrate WorkerSinkTaskTest to Mockito (1/3) (#14663)
This is an automated email from the ASF dual-hosted git repository.
gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 24a79c2613e KAFKA-14683: Migrate WorkerSinkTaskTest to Mockito (1/3) (#14663)
24a79c2613e is described below
commit 24a79c2613e32b1d1615f6f34d63225259740d0e
Author: Hector Geraldino <hg...@gmail.com>
AuthorDate: Mon Feb 5 13:55:57 2024 -0500
KAFKA-14683: Migrate WorkerSinkTaskTest to Mockito (1/3) (#14663)
Reviewers: Greg Harris <gr...@aiven.io>
---
.../connect/runtime/WorkerSinkTaskMockitoTest.java | 672 +++++++++++++++++++++
.../kafka/connect/runtime/WorkerSinkTaskTest.java | 488 ---------------
2 files changed, 672 insertions(+), 488 deletions(-)
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java
new file mode 100644
index 00000000000..b41f571d635
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java
@@ -0,0 +1,672 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
+import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.ClusterConfigState;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.MockitoRule;
+import org.mockito.quality.Strictness;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class WorkerSinkTaskMockitoTest {
+ // These are fixed to keep this code simpler. In this example we assume byte[] raw values
+ // with mix of integer/string in Connect
+ private static final String TOPIC = "test";
+ private static final int PARTITION = 12;
+ private static final int PARTITION2 = 13;
+ private static final int PARTITION3 = 14;
+ private static final long FIRST_OFFSET = 45;
+ private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
+ private static final int KEY = 12;
+ private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
+ private static final String VALUE = "VALUE";
+ private static final byte[] RAW_KEY = "key".getBytes();
+ private static final byte[] RAW_VALUE = "value".getBytes();
+
+ private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
+ private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2);
+ private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3);
+
+ private static final Set<TopicPartition> INITIAL_ASSIGNMENT =
+ new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
+
+ private static final Map<String, String> TASK_PROPS = new HashMap<>();
+
+ static {
+ TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
+ TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, SinkTask.class.getName());
+ }
+
+ private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
+
+ private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+ private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
+ private TargetState initialState = TargetState.STARTED;
+ private MockTime time;
+ private WorkerSinkTask workerTask;
+ @Mock
+ private SinkTask sinkTask;
+ private ArgumentCaptor<WorkerSinkTaskContext> sinkTaskContext = ArgumentCaptor.forClass(WorkerSinkTaskContext.class);
+ private WorkerConfig workerConfig;
+ private MockConnectMetrics metrics;
+ @Mock
+ private PluginClassLoader pluginLoader;
+ @Mock
+ private Converter keyConverter;
+ @Mock
+ private Converter valueConverter;
+ @Mock
+ private HeaderConverter headerConverter;
+ @Mock
+ private TransformationChain<ConsumerRecord<byte[], byte[]>, SinkRecord> transformationChain;
+ @Mock
+ private TaskStatus.Listener statusListener;
+ @Mock
+ private StatusBackingStore statusBackingStore;
+ @Mock
+ private KafkaConsumer<byte[], byte[]> consumer;
+ @Mock
+ private ErrorHandlingMetrics errorHandlingMetrics;
+ private ArgumentCaptor<ConsumerRebalanceListener> rebalanceListener = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ @Rule
+ public final MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
+
+ private long recordsReturnedTp1;
+ private long recordsReturnedTp3;
+
+ @Before
+ public void setUp() {
+ time = new MockTime();
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+ workerConfig = new StandaloneConfig(workerProps);
+ metrics = new MockConnectMetrics(time);
+ recordsReturnedTp1 = 0;
+ recordsReturnedTp3 = 0;
+ }
+
+ private void createTask(TargetState initialState) {
+ createTask(initialState, keyConverter, valueConverter, headerConverter);
+ }
+
+ private void createTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
+ createTask(initialState, keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.noopOperator(), Collections::emptyList);
+ }
+
+ private void createTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter,
+ RetryWithToleranceOperator<ConsumerRecord<byte[], byte[]>> retryWithToleranceOperator,
+ Supplier<List<ErrorReporter<ConsumerRecord<byte[], byte[]>>>> errorReportersSupplier) {
+ workerTask = new WorkerSinkTask(
+ taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics,
+ keyConverter, valueConverter, errorHandlingMetrics, headerConverter,
+ transformationChain, consumer, pluginLoader, time,
+ retryWithToleranceOperator, null, statusBackingStore, errorReportersSupplier);
+ }
+
+ @After
+ public void tearDown() {
+ if (metrics != null) metrics.stop();
+ }
+
+ @Test
+ public void testStartPaused() {
+ createTask(TargetState.PAUSED);
+
+ expectPollInitialAssignment();
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ verifyInitializeTask();
+
+ workerTask.iteration();
+ verifyPollInitialAssignment();
+
+ time.sleep(10000L);
+ verify(consumer).pause(INITIAL_ASSIGNMENT);
+
+ assertSinkMetricValue("partition-count", 2);
+ assertTaskMetricValue("status", "paused");
+ assertTaskMetricValue("running-ratio", 0.0);
+ assertTaskMetricValue("pause-ratio", 1.0);
+ assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN);
+ }
+
+ @Test
+ public void testPause() {
+ createTask(initialState);
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ verifyInitializeTask();
+
+ expectTaskGetTopic();
+ expectPollInitialAssignment()
+ .thenAnswer(expectConsumerPoll(1))
+ // Pause
+ .thenThrow(new WakeupException())
+ // Offset commit as requested when pausing; No records returned by consumer.poll()
+ .thenAnswer(expectConsumerPoll(0))
+ // And unpause
+ .thenThrow(new WakeupException())
+ .thenAnswer(expectConsumerPoll(1));
+
+ expectConversionAndTransformation(null, new RecordHeaders());
+
+ workerTask.iteration(); // initial assignment
+ verifyPollInitialAssignment();
+
+ workerTask.iteration(); // fetch some data
+ // put should've been called twice now (initial assignment & poll)
+ verify(sinkTask, times(2)).put(anyList());
+
+ workerTask.transitionTo(TargetState.PAUSED);
+ time.sleep(10_000L);
+
+ assertSinkMetricValue("partition-count", 2);
+ assertSinkMetricValue("sink-record-read-total", 1.0);
+ assertSinkMetricValue("sink-record-send-total", 1.0);
+ assertSinkMetricValue("sink-record-active-count", 1.0);
+ assertSinkMetricValue("sink-record-active-count-max", 1.0);
+ assertSinkMetricValue("sink-record-active-count-avg", 0.333333);
+ assertSinkMetricValue("offset-commit-seq-no", 0.0);
+ assertSinkMetricValue("offset-commit-completion-rate", 0.0);
+ assertSinkMetricValue("offset-commit-completion-total", 0.0);
+ assertSinkMetricValue("offset-commit-skip-rate", 0.0);
+ assertSinkMetricValue("offset-commit-skip-total", 0.0);
+ assertTaskMetricValue("status", "running");
+ assertTaskMetricValue("running-ratio", 1.0);
+ assertTaskMetricValue("pause-ratio", 0.0);
+ assertTaskMetricValue("batch-size-max", 1.0);
+ assertTaskMetricValue("batch-size-avg", 0.5);
+ assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN);
+ assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
+ assertTaskMetricValue("offset-commit-success-percentage", 0.0);
+
+ workerTask.iteration(); // wakeup
+ // Pause
+ verify(statusListener).onPause(taskId);
+ verify(consumer).pause(INITIAL_ASSIGNMENT);
+ verify(consumer).wakeup();
+
+ // Offset commit as requested when pausing; No records returned by consumer.poll()
+ when(sinkTask.preCommit(anyMap())).thenReturn(Collections.emptyMap());
+
+ workerTask.iteration(); // now paused
+ time.sleep(30000L);
+
+ assertSinkMetricValue("offset-commit-seq-no", 1.0);
+ assertSinkMetricValue("offset-commit-completion-rate", 0.0333);
+ assertSinkMetricValue("offset-commit-completion-total", 1.0);
+ assertSinkMetricValue("offset-commit-skip-rate", 0.0);
+ assertSinkMetricValue("offset-commit-skip-total", 0.0);
+ assertTaskMetricValue("status", "paused");
+ assertTaskMetricValue("running-ratio", 0.25);
+ assertTaskMetricValue("pause-ratio", 0.75);
+ verify(sinkTask, times(3)).put(anyList());
+
+ workerTask.transitionTo(TargetState.STARTED);
+ workerTask.iteration(); // wakeup
+ workerTask.iteration(); // now unpaused
+
+ // And unpause
+ verify(statusListener).onResume(taskId);
+ verify(consumer, times(2)).wakeup();
+ INITIAL_ASSIGNMENT.forEach(tp -> {
+ verify(consumer).resume(Collections.singleton(tp));
+ });
+ verify(sinkTask, times(4)).put(anyList());
+ }
+
+ @Test
+ public void testShutdown() throws Exception {
+ createTask(initialState);
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ verifyInitializeTask();
+
+ expectTaskGetTopic();
+ expectPollInitialAssignment()
+ .thenAnswer(expectConsumerPoll(1));
+
+ expectConversionAndTransformation(null, new RecordHeaders());
+
+ workerTask.iteration();
+ verifyPollInitialAssignment();
+ sinkTaskContext.getValue().requestCommit(); // Force an offset commit
+
+ // second iteration
+ when(sinkTask.preCommit(anyMap())).thenReturn(Collections.emptyMap());
+
+ workerTask.iteration();
+ verify(sinkTask, times(2)).put(anyList());
+
+ doAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
+ rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT);
+ return null;
+ }).when(consumer).close();
+
+ workerTask.stop();
+ verify(consumer).wakeup();
+
+ workerTask.close();
+ verify(sinkTask).stop();
+ verify(consumer).close();
+ verify(headerConverter).close();
+ }
+
+ @Test
+ public void testErrorInRebalancePartitionLoss() {
+ RuntimeException exception = new RuntimeException("Revocation error");
+ createTask(initialState);
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ verifyInitializeTask();
+
+ expectPollInitialAssignment()
+ .thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
+ rebalanceListener.getValue().onPartitionsLost(INITIAL_ASSIGNMENT);
+ return ConsumerRecords.empty();
+ });
+
+ doThrow(exception).when(sinkTask).close(INITIAL_ASSIGNMENT);
+
+ workerTask.iteration();
+ verifyPollInitialAssignment();
+
+ try {
+ workerTask.iteration();
+ fail("Poll should have raised the rebalance exception");
+ } catch (RuntimeException e) {
+ assertEquals(exception, e);
+ }
+ }
+
+ @Test
+ public void testErrorInRebalancePartitionRevocation() {
+ RuntimeException exception = new RuntimeException("Revocation error");
+ createTask(initialState);
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ verifyInitializeTask();
+
+ expectPollInitialAssignment()
+ .thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
+ rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT);
+ return ConsumerRecords.empty();
+ });
+
+ expectRebalanceRevocationError(exception);
+
+ workerTask.iteration();
+ verifyPollInitialAssignment();
+ try {
+ workerTask.iteration();
+ fail("Poll should have raised the rebalance exception");
+ } catch (RuntimeException e) {
+ assertEquals(exception, e);
+ }
+ }
+
+ @Test
+ public void testErrorInRebalancePartitionAssignment() {
+ RuntimeException exception = new RuntimeException("Assignment error");
+ createTask(initialState);
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ verifyInitializeTask();
+
+ expectPollInitialAssignment()
+ .thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
+ rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT);
+ rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
+ return ConsumerRecords.empty();
+ });
+
+ workerTask.iteration();
+ verifyPollInitialAssignment();
+
+ expectRebalanceAssignmentError(exception);
+ try {
+ workerTask.iteration();
+ fail("Poll should have raised the rebalance exception");
+ } catch (RuntimeException e) {
+ assertEquals(exception, e);
+ } finally {
+ verify(sinkTask).close(INITIAL_ASSIGNMENT);
+ }
+ }
+
+ @Test
+ public void testPartialRevocationAndAssignment() {
+ createTask(initialState);
+
+ when(consumer.assignment())
+ .thenReturn(INITIAL_ASSIGNMENT)
+ .thenReturn(INITIAL_ASSIGNMENT)
+ .thenReturn(Collections.singleton(TOPIC_PARTITION2))
+ .thenReturn(Collections.singleton(TOPIC_PARTITION2))
+ .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3)))
+ .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3)))
+ .thenReturn(INITIAL_ASSIGNMENT)
+ .thenReturn(INITIAL_ASSIGNMENT)
+ .thenReturn(INITIAL_ASSIGNMENT);
+
+ INITIAL_ASSIGNMENT.forEach(tp -> when(consumer.position(tp)).thenReturn(FIRST_OFFSET));
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ verifyInitializeTask();
+
+ when(consumer.poll(any(Duration.class)))
+ .thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
+ rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
+ return ConsumerRecords.empty();
+ })
+ .thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
+ rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION));
+ rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet());
+ return ConsumerRecords.empty();
+ })
+ .thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
+ rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet());
+ rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
+ return ConsumerRecords.empty();
+ })
+ .thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
+ rebalanceListener.getValue().onPartitionsLost(Collections.singleton(TOPIC_PARTITION3));
+ rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION));
+ return ConsumerRecords.empty();
+ });
+
+ final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
+ when(sinkTask.preCommit(offsets)).thenReturn(offsets);
+
+ when(consumer.position(TOPIC_PARTITION3)).thenReturn(FIRST_OFFSET);
+
+ // First iteration--first call to poll, first consumer assignment
+ workerTask.iteration();
+ verifyPollInitialAssignment();
+
+ // Second iteration--second call to poll, partial consumer revocation
+ workerTask.iteration();
+ verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION));
+ verify(sinkTask, times(2)).put(Collections.emptyList());
+
+ // Third iteration--third call to poll, partial consumer assignment
+ workerTask.iteration();
+ verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION3));
+ verify(sinkTask, times(3)).put(Collections.emptyList());
+
+ // Fourth iteration--fourth call to poll, one partition lost; can't commit offsets for it, one new partition assigned
+ workerTask.iteration();
+ verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION3));
+ verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION));
+ verify(sinkTask, times(4)).put(Collections.emptyList());
+ }
+
+ @Test
+ public void testMetricsGroup() {
+ SinkTaskMetricsGroup group = new SinkTaskMetricsGroup(taskId, metrics);
+ SinkTaskMetricsGroup group1 = new SinkTaskMetricsGroup(taskId1, metrics);
+ for (int i = 0; i != 10; ++i) {
+ group.recordRead(1);
+ group.recordSend(2);
+ group.recordPut(3);
+ group.recordPartitionCount(4);
+ group.recordOffsetSequenceNumber(5);
+ }
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+ committedOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+ group.recordCommittedOffsets(committedOffsets);
+ Map<TopicPartition, OffsetAndMetadata> consumedOffsets = new HashMap<>();
+ consumedOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 10));
+ group.recordConsumedOffsets(consumedOffsets);
+
+ for (int i = 0; i != 20; ++i) {
+ group1.recordRead(1);
+ group1.recordSend(2);
+ group1.recordPut(30);
+ group1.recordPartitionCount(40);
+ group1.recordOffsetSequenceNumber(50);
+ }
+ committedOffsets = new HashMap<>();
+ committedOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 2));
+ committedOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET + 3));
+ group1.recordCommittedOffsets(committedOffsets);
+ consumedOffsets = new HashMap<>();
+ consumedOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 20));
+ consumedOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET + 30));
+ group1.recordConsumedOffsets(consumedOffsets);
+
+ assertEquals(0.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "sink-record-read-rate"), 0.001d);
+ assertEquals(0.667, metrics.currentMetricValueAsDouble(group.metricGroup(), "sink-record-send-rate"), 0.001d);
+ assertEquals(9, metrics.currentMetricValueAsDouble(group.metricGroup(), "sink-record-active-count"), 0.001d);
+ assertEquals(4, metrics.currentMetricValueAsDouble(group.metricGroup(), "partition-count"), 0.001d);
+ assertEquals(5, metrics.currentMetricValueAsDouble(group.metricGroup(), "offset-commit-seq-no"), 0.001d);
+ assertEquals(3, metrics.currentMetricValueAsDouble(group.metricGroup(), "put-batch-max-time-ms"), 0.001d);
+
+ // Close the group
+ group.close();
+
+ for (MetricName metricName : group.metricGroup().metrics().metrics().keySet()) {
+ // Metrics for this group should no longer exist
+ assertFalse(group.metricGroup().groupId().includes(metricName));
+ }
+ // Sensors for this group should no longer exist
+ assertNull(group.metricGroup().metrics().getSensor("source-record-poll"));
+ assertNull(group.metricGroup().metrics().getSensor("source-record-write"));
+ assertNull(group.metricGroup().metrics().getSensor("poll-batch-time"));
+
+ assertEquals(0.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "sink-record-read-rate"), 0.001d);
+ assertEquals(1.333, metrics.currentMetricValueAsDouble(group1.metricGroup(), "sink-record-send-rate"), 0.001d);
+ assertEquals(45, metrics.currentMetricValueAsDouble(group1.metricGroup(), "sink-record-active-count"), 0.001d);
+ assertEquals(40, metrics.currentMetricValueAsDouble(group1.metricGroup(), "partition-count"), 0.001d);
+ assertEquals(50, metrics.currentMetricValueAsDouble(group1.metricGroup(), "offset-commit-seq-no"), 0.001d);
+ assertEquals(30, metrics.currentMetricValueAsDouble(group1.metricGroup(), "put-batch-max-time-ms"), 0.001d);
+ }
+
+ private void expectRebalanceRevocationError(RuntimeException e) {
+ when(sinkTask.preCommit(anyMap())).thenReturn(Collections.emptyMap());
+ doThrow(e).when(sinkTask).close(INITIAL_ASSIGNMENT);
+ }
+
+ private void expectRebalanceAssignmentError(RuntimeException e) {
+ when(sinkTask.preCommit(anyMap())).thenReturn(Collections.emptyMap());
+ when(consumer.position(TOPIC_PARTITION)).thenReturn(FIRST_OFFSET);
+ when(consumer.position(TOPIC_PARTITION2)).thenReturn(FIRST_OFFSET);
+
+ doThrow(e).when(sinkTask).open(INITIAL_ASSIGNMENT);
+ }
+
+ private void verifyInitializeTask() {
+ verify(consumer).subscribe(eq(asList(TOPIC)), rebalanceListener.capture());
+ verify(sinkTask).initialize(sinkTaskContext.capture());
+ verify(sinkTask).start(TASK_PROPS);
+ }
+
+ private OngoingStubbing<ConsumerRecords<byte[], byte[]>> expectPollInitialAssignment() {
+ when(consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT);
+ INITIAL_ASSIGNMENT.forEach(tp -> when(consumer.position(tp)).thenReturn(FIRST_OFFSET));
+
+ return when(consumer.poll(any(Duration.class))).thenAnswer(
+ invocation -> {
+ rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
+ return ConsumerRecords.empty();
+ }
+ );
+ }
+
+ private void verifyPollInitialAssignment() {
+ verify(sinkTask).open(INITIAL_ASSIGNMENT);
+ verify(consumer, atLeastOnce()).assignment();
+ verify(sinkTask).put(Collections.emptyList());
+ }
+
+ private Answer<ConsumerRecords<byte[], byte[]>> expectConsumerPoll(final int numMessages) {
+ return expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, new RecordHeaders());
+ }
+
+ private Answer<ConsumerRecords<byte[], byte[]>> expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType, Headers headers) {
+ return invocation -> {
+ List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++)
+ records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + i, timestamp, timestampType,
+ 0, 0, RAW_KEY, RAW_VALUE, headers, Optional.empty()));
+ recordsReturnedTp1 += numMessages;
+ return new ConsumerRecords<>(
+ numMessages > 0 ?
+ Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records)
+ : Collections.emptyMap()
+ );
+ };
+ }
+
+ private void expectConversionAndTransformation(final String topicPrefix, final Headers headers) {
+ when(keyConverter.toConnectData(TOPIC, headers, RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
+ when(valueConverter.toConnectData(TOPIC, headers, RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
+
+ for (Header header : headers) {
+ when(headerConverter.toConnectHeader(TOPIC, header.key(), header.value())).thenReturn(new SchemaAndValue(VALUE_SCHEMA, new String(header.value())));
+ }
+
+ expectTransformation(topicPrefix);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void expectTransformation(final String topicPrefix) {
+ when(transformationChain.apply(any(ProcessingContext.class), any(SinkRecord.class))).thenAnswer((Answer<SinkRecord>)
+ invocation -> {
+ SinkRecord origRecord = invocation.getArgument(1);
+ return topicPrefix != null && !topicPrefix.isEmpty()
+ ? origRecord.newRecord(
+ topicPrefix + origRecord.topic(),
+ origRecord.kafkaPartition(),
+ origRecord.keySchema(),
+ origRecord.key(),
+ origRecord.valueSchema(),
+ origRecord.value(),
+ origRecord.timestamp(),
+ origRecord.headers()
+ ) : origRecord;
+ });
+ }
+
+ private void expectTaskGetTopic() {
+ when(statusBackingStore.getTopic(anyString(), anyString())).thenAnswer((Answer<TopicStatus>) invocation -> {
+ String connector = invocation.getArgument(0, String.class);
+ String topic = invocation.getArgument(1, String.class);
+ return new TopicStatus(topic, new ConnectorTaskId(connector, 0), Time.SYSTEM.milliseconds());
+ });
+ }
+
+ private void assertSinkMetricValue(String name, double expected) {
+ MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
+ double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name);
+ assertEquals(expected, measured, 0.001d);
+ }
+
+ private void assertTaskMetricValue(String name, double expected) {
+ MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
+ double measured = metrics.currentMetricValueAsDouble(taskGroup, name);
+ assertEquals(expected, measured, 0.001d);
+ }
+
+ private void assertTaskMetricValue(String name, String expected) {
+ MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
+ String measured = metrics.currentMetricValueAsString(taskGroup, name);
+ assertEquals(expected, measured);
+ }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index d2582501934..66edf6a0717 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
@@ -38,7 +37,6 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
-import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
@@ -205,175 +203,6 @@ public class WorkerSinkTaskTest {
if (metrics != null) metrics.stop();
}
- @Test
- public void testStartPaused() throws Exception {
- createTask(TargetState.PAUSED);
-
- expectInitializeTask();
- expectTaskGetTopic(true);
- expectPollInitialAssignment();
-
- EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT);
- consumer.pause(INITIAL_ASSIGNMENT);
- PowerMock.expectLastCall();
-
- PowerMock.replayAll();
-
- workerTask.initialize(TASK_CONFIG);
- workerTask.initializeAndStart();
- workerTask.iteration();
- time.sleep(10000L);
-
- assertSinkMetricValue("partition-count", 2);
- assertTaskMetricValue("status", "paused");
- assertTaskMetricValue("running-ratio", 0.0);
- assertTaskMetricValue("pause-ratio", 1.0);
- assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN);
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testPause() throws Exception {
- createTask(initialState);
-
- expectInitializeTask();
- expectTaskGetTopic(true);
- expectPollInitialAssignment();
-
- expectConsumerPoll(1);
- expectConversionAndTransformation(1);
- sinkTask.put(EasyMock.anyObject());
- EasyMock.expectLastCall();
-
- // Pause
- statusListener.onPause(taskId);
- EasyMock.expectLastCall();
- expectConsumerWakeup();
- EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT);
- consumer.pause(INITIAL_ASSIGNMENT);
- PowerMock.expectLastCall();
-
- // Offset commit as requested when pausing; No records returned by consumer.poll()
- sinkTask.preCommit(EasyMock.anyObject());
- EasyMock.expectLastCall().andStubReturn(Collections.emptyMap());
- expectConsumerPoll(0);
- sinkTask.put(Collections.emptyList());
- EasyMock.expectLastCall();
-
- // And unpause
- statusListener.onResume(taskId);
- EasyMock.expectLastCall();
- expectConsumerWakeup();
- EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
- INITIAL_ASSIGNMENT.forEach(tp -> {
- consumer.resume(Collections.singleton(tp));
- PowerMock.expectLastCall();
- });
-
- expectConsumerPoll(1);
- expectConversionAndTransformation(1);
- sinkTask.put(EasyMock.anyObject());
- EasyMock.expectLastCall();
-
- PowerMock.replayAll();
-
- workerTask.initialize(TASK_CONFIG);
- workerTask.initializeAndStart();
- workerTask.iteration(); // initial assignment
- workerTask.iteration(); // fetch some data
- workerTask.transitionTo(TargetState.PAUSED);
- time.sleep(10000L);
-
- assertSinkMetricValue("partition-count", 2);
- assertSinkMetricValue("sink-record-read-total", 1.0);
- assertSinkMetricValue("sink-record-send-total", 1.0);
- assertSinkMetricValue("sink-record-active-count", 1.0);
- assertSinkMetricValue("sink-record-active-count-max", 1.0);
- assertSinkMetricValue("sink-record-active-count-avg", 0.333333);
- assertSinkMetricValue("offset-commit-seq-no", 0.0);
- assertSinkMetricValue("offset-commit-completion-rate", 0.0);
- assertSinkMetricValue("offset-commit-completion-total", 0.0);
- assertSinkMetricValue("offset-commit-skip-rate", 0.0);
- assertSinkMetricValue("offset-commit-skip-total", 0.0);
- assertTaskMetricValue("status", "running");
- assertTaskMetricValue("running-ratio", 1.0);
- assertTaskMetricValue("pause-ratio", 0.0);
- assertTaskMetricValue("batch-size-max", 1.0);
- assertTaskMetricValue("batch-size-avg", 0.5);
- assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN);
- assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
- assertTaskMetricValue("offset-commit-success-percentage", 0.0);
-
- workerTask.iteration(); // wakeup
- workerTask.iteration(); // now paused
- time.sleep(30000L);
-
- assertSinkMetricValue("offset-commit-seq-no", 1.0);
- assertSinkMetricValue("offset-commit-completion-rate", 0.0333);
- assertSinkMetricValue("offset-commit-completion-total", 1.0);
- assertSinkMetricValue("offset-commit-skip-rate", 0.0);
- assertSinkMetricValue("offset-commit-skip-total", 0.0);
- assertTaskMetricValue("status", "paused");
- assertTaskMetricValue("running-ratio", 0.25);
- assertTaskMetricValue("pause-ratio", 0.75);
-
- workerTask.transitionTo(TargetState.STARTED);
- workerTask.iteration(); // wakeup
- workerTask.iteration(); // now unpaused
- //printMetrics();
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testShutdown() throws Exception {
- createTask(initialState);
-
- expectInitializeTask();
- expectTaskGetTopic(true);
-
- // first iteration
- expectPollInitialAssignment();
-
- // second iteration
- EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())).andReturn(Collections.emptyMap());
- expectConsumerPoll(1);
- expectConversionAndTransformation(1);
- sinkTask.put(EasyMock.anyObject());
- EasyMock.expectLastCall();
-
- // WorkerSinkTask::stop
- consumer.wakeup();
- PowerMock.expectLastCall();
- sinkTask.stop();
- PowerMock.expectLastCall();
-
- EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT);
- // WorkerSinkTask::close
- consumer.close();
- PowerMock.expectLastCall().andAnswer(() -> {
- rebalanceListener.getValue().onPartitionsRevoked(
- INITIAL_ASSIGNMENT
- );
- return null;
- });
- headerConverter.close();
- PowerMock.expectLastCall();
-
- PowerMock.replayAll();
-
- workerTask.initialize(TASK_CONFIG);
- workerTask.initializeAndStart();
- workerTask.iteration();
- sinkTaskContext.getValue().requestCommit(); // Force an offset commit
- workerTask.iteration();
- workerTask.stop();
- workerTask.close();
-
- PowerMock.verifyAll();
- }
-
@Test
public void testPollRedelivery() throws Exception {
createTask(initialState);
@@ -550,152 +379,6 @@ public class WorkerSinkTaskTest {
PowerMock.verifyAll();
}
- @Test
- public void testErrorInRebalancePartitionLoss() throws Exception {
- RuntimeException exception = new RuntimeException("Revocation error");
-
- createTask(initialState);
-
- expectInitializeTask();
- expectTaskGetTopic(true);
- expectPollInitialAssignment();
- expectRebalanceLossError(exception);
-
- PowerMock.replayAll();
-
- workerTask.initialize(TASK_CONFIG);
- workerTask.initializeAndStart();
- workerTask.iteration();
- try {
- workerTask.iteration();
- fail("Poll should have raised the rebalance exception");
- } catch (RuntimeException e) {
- assertEquals(exception, e);
- }
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testErrorInRebalancePartitionRevocation() throws Exception {
- RuntimeException exception = new RuntimeException("Revocation error");
-
- createTask(initialState);
-
- expectInitializeTask();
- expectTaskGetTopic(true);
- expectPollInitialAssignment();
- expectRebalanceRevocationError(exception);
-
- PowerMock.replayAll();
-
- workerTask.initialize(TASK_CONFIG);
- workerTask.initializeAndStart();
- workerTask.iteration();
- try {
- workerTask.iteration();
- fail("Poll should have raised the rebalance exception");
- } catch (RuntimeException e) {
- assertEquals(exception, e);
- }
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testErrorInRebalancePartitionAssignment() throws Exception {
- RuntimeException exception = new RuntimeException("Assignment error");
-
- createTask(initialState);
-
- expectInitializeTask();
- expectTaskGetTopic(true);
- expectPollInitialAssignment();
- expectRebalanceAssignmentError(exception);
-
- PowerMock.replayAll();
-
- workerTask.initialize(TASK_CONFIG);
- workerTask.initializeAndStart();
- workerTask.iteration();
- try {
- workerTask.iteration();
- fail("Poll should have raised the rebalance exception");
- } catch (RuntimeException e) {
- assertEquals(exception, e);
- }
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testPartialRevocationAndAssignment() throws Exception {
- createTask(initialState);
-
- expectInitializeTask();
- expectTaskGetTopic(true);
- expectPollInitialAssignment();
-
- EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
- () -> {
- rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION));
- rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet());
- return ConsumerRecords.empty();
- });
- EasyMock.expect(consumer.assignment()).andReturn(Collections.singleton(TOPIC_PARTITION)).times(2);
- final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
- offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
- sinkTask.preCommit(offsets);
- EasyMock.expectLastCall().andReturn(offsets);
- sinkTask.close(Collections.singleton(TOPIC_PARTITION));
- EasyMock.expectLastCall();
- sinkTask.put(Collections.emptyList());
- EasyMock.expectLastCall();
-
- EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
- () -> {
- rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet());
- rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
- return ConsumerRecords.empty();
- });
- EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))).times(2);
- EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
- sinkTask.open(Collections.singleton(TOPIC_PARTITION3));
- EasyMock.expectLastCall();
- sinkTask.put(Collections.emptyList());
- EasyMock.expectLastCall();
-
- EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
- () -> {
- rebalanceListener.getValue().onPartitionsLost(Collections.singleton(TOPIC_PARTITION3));
- rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION));
- return ConsumerRecords.empty();
- });
- EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(3);
- sinkTask.close(Collections.singleton(TOPIC_PARTITION3));
- EasyMock.expectLastCall();
- EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
- sinkTask.open(Collections.singleton(TOPIC_PARTITION));
- EasyMock.expectLastCall();
- sinkTask.put(Collections.emptyList());
- EasyMock.expectLastCall();
-
- PowerMock.replayAll();
-
- workerTask.initialize(TASK_CONFIG);
- workerTask.initializeAndStart();
- // First iteration--first call to poll, first consumer assignment
- workerTask.iteration();
- // Second iteration--second call to poll, partial consumer revocation
- workerTask.iteration();
- // Third iteration--third call to poll, partial consumer assignment
- workerTask.iteration();
- // Fourth iteration--fourth call to poll, one partition lost; can't commit offsets for it, one new partition assigned
- workerTask.iteration();
-
- PowerMock.verifyAll();
- }
-
@Test
public void testPreCommitFailureAfterPartialRevocationAndAssignment() throws Exception {
createTask(initialState);
@@ -1783,67 +1466,6 @@ public class WorkerSinkTaskTest {
PowerMock.verifyAll();
}
- @Test
- public void testMetricsGroup() {
- SinkTaskMetricsGroup group = new SinkTaskMetricsGroup(taskId, metrics);
- SinkTaskMetricsGroup group1 = new SinkTaskMetricsGroup(taskId1, metrics);
- for (int i = 0; i != 10; ++i) {
- group.recordRead(1);
- group.recordSend(2);
- group.recordPut(3);
- group.recordPartitionCount(4);
- group.recordOffsetSequenceNumber(5);
- }
- Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
- committedOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
- group.recordCommittedOffsets(committedOffsets);
- Map<TopicPartition, OffsetAndMetadata> consumedOffsets = new HashMap<>();
- consumedOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 10));
- group.recordConsumedOffsets(consumedOffsets);
-
- for (int i = 0; i != 20; ++i) {
- group1.recordRead(1);
- group1.recordSend(2);
- group1.recordPut(30);
- group1.recordPartitionCount(40);
- group1.recordOffsetSequenceNumber(50);
- }
- committedOffsets = new HashMap<>();
- committedOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 2));
- committedOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET + 3));
- group1.recordCommittedOffsets(committedOffsets);
- consumedOffsets = new HashMap<>();
- consumedOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 20));
- consumedOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET + 30));
- group1.recordConsumedOffsets(consumedOffsets);
-
- assertEquals(0.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "sink-record-read-rate"), 0.001d);
- assertEquals(0.667, metrics.currentMetricValueAsDouble(group.metricGroup(), "sink-record-send-rate"), 0.001d);
- assertEquals(9, metrics.currentMetricValueAsDouble(group.metricGroup(), "sink-record-active-count"), 0.001d);
- assertEquals(4, metrics.currentMetricValueAsDouble(group.metricGroup(), "partition-count"), 0.001d);
- assertEquals(5, metrics.currentMetricValueAsDouble(group.metricGroup(), "offset-commit-seq-no"), 0.001d);
- assertEquals(3, metrics.currentMetricValueAsDouble(group.metricGroup(), "put-batch-max-time-ms"), 0.001d);
-
- // Close the group
- group.close();
-
- for (MetricName metricName : group.metricGroup().metrics().metrics().keySet()) {
- // Metrics for this group should no longer exist
- assertFalse(group.metricGroup().groupId().includes(metricName));
- }
- // Sensors for this group should no longer exist
- assertNull(group.metricGroup().metrics().getSensor("source-record-poll"));
- assertNull(group.metricGroup().metrics().getSensor("source-record-write"));
- assertNull(group.metricGroup().metrics().getSensor("poll-batch-time"));
-
- assertEquals(0.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "sink-record-read-rate"), 0.001d);
- assertEquals(1.333, metrics.currentMetricValueAsDouble(group1.metricGroup(), "sink-record-send-rate"), 0.001d);
- assertEquals(45, metrics.currentMetricValueAsDouble(group1.metricGroup(), "sink-record-active-count"), 0.001d);
- assertEquals(40, metrics.currentMetricValueAsDouble(group1.metricGroup(), "partition-count"), 0.001d);
- assertEquals(50, metrics.currentMetricValueAsDouble(group1.metricGroup(), "offset-commit-seq-no"), 0.001d);
- assertEquals(30, metrics.currentMetricValueAsDouble(group1.metricGroup(), "put-batch-max-time-ms"), 0.001d);
- }
-
@Test
public void testHeaders() throws Exception {
Headers headers = new RecordHeaders();
@@ -1994,53 +1616,6 @@ public class WorkerSinkTaskTest {
PowerMock.expectLastCall();
}
- private void expectRebalanceLossError(RuntimeException e) {
- sinkTask.close(new HashSet<>(INITIAL_ASSIGNMENT));
- EasyMock.expectLastCall().andThrow(e);
-
- EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
- () -> {
- rebalanceListener.getValue().onPartitionsLost(INITIAL_ASSIGNMENT);
- return ConsumerRecords.empty();
- });
- }
-
- private void expectRebalanceRevocationError(RuntimeException e) {
- sinkTask.close(new HashSet<>(INITIAL_ASSIGNMENT));
- EasyMock.expectLastCall().andThrow(e);
-
- sinkTask.preCommit(EasyMock.anyObject());
- EasyMock.expectLastCall().andReturn(Collections.emptyMap());
-
- EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
- () -> {
- rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT);
- return ConsumerRecords.empty();
- });
- }
-
- private void expectRebalanceAssignmentError(RuntimeException e) {
- sinkTask.close(INITIAL_ASSIGNMENT);
- EasyMock.expectLastCall();
-
- sinkTask.preCommit(EasyMock.anyObject());
- EasyMock.expectLastCall().andReturn(Collections.emptyMap());
-
- EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
- EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
-
- sinkTask.open(INITIAL_ASSIGNMENT);
- EasyMock.expectLastCall().andThrow(e);
-
- EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
- EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
- () -> {
- rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT);
- rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
- return ConsumerRecords.empty();
- });
- }
-
private void expectPollInitialAssignment() {
sinkTask.open(INITIAL_ASSIGNMENT);
EasyMock.expectLastCall();
@@ -2057,12 +1632,6 @@ public class WorkerSinkTaskTest {
EasyMock.expectLastCall();
}
- private void expectConsumerWakeup() {
- consumer.wakeup();
- EasyMock.expectLastCall();
- EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andThrow(new WakeupException());
- }
-
private void expectConsumerPoll(final int numMessages) {
expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, emptyHeaders());
}
@@ -2180,63 +1749,6 @@ public class WorkerSinkTaskTest {
assertEquals(expected, measured);
}
- private void printMetrics() {
- System.out.println();
- sinkMetricValue("sink-record-read-rate");
- sinkMetricValue("sink-record-read-total");
- sinkMetricValue("sink-record-send-rate");
- sinkMetricValue("sink-record-send-total");
- sinkMetricValue("sink-record-active-count");
- sinkMetricValue("sink-record-active-count-max");
- sinkMetricValue("sink-record-active-count-avg");
- sinkMetricValue("partition-count");
- sinkMetricValue("offset-commit-seq-no");
- sinkMetricValue("offset-commit-completion-rate");
- sinkMetricValue("offset-commit-completion-total");
- sinkMetricValue("offset-commit-skip-rate");
- sinkMetricValue("offset-commit-skip-total");
- sinkMetricValue("put-batch-max-time-ms");
- sinkMetricValue("put-batch-avg-time-ms");
-
- taskMetricValue("status-unassigned");
- taskMetricValue("status-running");
- taskMetricValue("status-paused");
- taskMetricValue("status-failed");
- taskMetricValue("status-destroyed");
- taskMetricValue("running-ratio");
- taskMetricValue("pause-ratio");
- taskMetricValue("offset-commit-max-time-ms");
- taskMetricValue("offset-commit-avg-time-ms");
- taskMetricValue("batch-size-max");
- taskMetricValue("batch-size-avg");
- taskMetricValue("offset-commit-failure-percentage");
- taskMetricValue("offset-commit-success-percentage");
- }
-
- private double sinkMetricValue(String metricName) {
- MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
- double value = metrics.currentMetricValueAsDouble(sinkTaskGroup, metricName);
- System.out.println("** " + metricName + "=" + value);
- return value;
- }
-
- private double taskMetricValue(String metricName) {
- MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
- double value = metrics.currentMetricValueAsDouble(taskGroup, metricName);
- System.out.println("** " + metricName + "=" + value);
- return value;
- }
-
-
- private void assertMetrics(int minimumPollCountExpected) {
- MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
- MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
- double readRate = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-read-rate");
- double readTotal = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-read-total");
- double sendRate = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-send-rate");
- double sendTotal = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-send-total");
- }
-
private RecordHeaders emptyHeaders() {
return new RecordHeaders();
}