You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "yashmayya (via GitHub)" <gi...@apache.org> on 2023/06/06 12:11:51 UTC

[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1219524567


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map<String,
                 Admin admin = adminFactory.apply(adminConfig);
 
                 try {
-                    List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
-                    Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() != null)
-                            .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())));
-
-                    if (!offsetsToAlter.isEmpty()) {
-                        log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.",
-                                connName, offsetsToAlter);
-                        AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs(
+                    Map<TopicPartition, Long> offsetsToWrite;
+                    if (isReset) {
+                        offsetsToWrite = new HashMap<>();
+                        ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs(
                                 (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter,
-                                alterConsumerGroupOffsetsOptions);
-
-                        adminFutures.add(alterConsumerGroupOffsetsResult.all());
+                        try {
+                            admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions)
+                                    .partitionsToOffsetAndMetadata()
+                                    .get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                                    .forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+                            log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}",
+                                    connName, groupId, offsetsToWrite.keySet());
+                        } catch (Exception e) {
+                            Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName);
+                            log.error("Failed to list offsets prior to resetting sink connector offsets", e);
+                            cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting sink connector offsets", e), null);
+                            return;
+                        }
+                    } else {
+                        offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets);
                     }
 
-                    Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() == null)
-                            .map(Map.Entry::getKey)
-                            .collect(Collectors.toSet());
-
-                    if (!partitionsToReset.isEmpty()) {
-                        log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.",
-                                connName, partitionsToReset);
-                        DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs(
-                                (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset,
-                                deleteConsumerGroupOffsetsOptions);
+                    boolean alterOffsetsResult;
+                    try {
+                        alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite);
+                    } catch (UnsupportedOperationException e) {
+                        throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " +
+                                "modification of offsets", e);
+                    }
 
-                        adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+                    // This should only occur for an offset reset request when:
+                    // 1. There was a prior attempt to reset offsets
+                    // OR
+                    // 2. No offsets have been committed yet
+                    if (offsetsToWrite.isEmpty()) {

Review Comment:
   I'm wondering whether we should go ahead and attempt to delete the consumer group even in this case, in order to avoid inconsistency (if no offsets have been committed for the group yet, the current implementation won't delete the consumer group). The only minor downside is that we'd need special case handling for `GroupIdNotFoundException`s arising from calls to `Admin::deleteConsumerGroups` (interestingly `Admin::listConsumerGroupOffsets` doesn't result in an exception for non-existent groups, but an empty partition offset map).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org