You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/05/09 21:11:42 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1188741900


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) {
         );
     }
 
+    @Override
+    public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
+        log.trace("Submitting alter offsets request for connector '{}'", connName);
+
+        addRequest(() -> {
+            refreshConfigSnapshot(workerSyncTimeoutMs);
+            if (!alterConnectorOffsetsChecks(connName, callback)) {
+                return null;
+            }
+            // At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run
+            // a zombie fencing request
+            if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) {
+                log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once semantics enabled.", connName);
+                getFenceZombieSourceTasksCallable(connName, (error, ignored) -> {
+                    if (error != null) {
+                        log.error("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", error);

Review Comment:
   Similar concerns about wording:
   ```suggestion
                           log.error("Failed to perform zombie fencing for source connector prior to altering offsets", error);
   ```
   
   We could also say "source connector with exactly-once support enabled" but that seems unnecessarily verbose.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) {
         );
     }
 
+    @Override
+    public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
+        log.trace("Submitting alter offsets request for connector '{}'", connName);
+
+        addRequest(() -> {
+            refreshConfigSnapshot(workerSyncTimeoutMs);
+            if (!alterConnectorOffsetsChecks(connName, callback)) {
+                return null;
+            }
+            // At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run
+            // a zombie fencing request
+            if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) {
+                log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once semantics enabled.", connName);
+                getFenceZombieSourceTasksCallable(connName, (error, ignored) -> {
+                    if (error != null) {
+                        log.error("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", error);
+                        callback.onCompletion(new ConnectException("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets",
+                                error), null);
+                    } else {
+                        log.debug("Successfully completed zombie fencing for source connector {}; proceeding to alter offsets.", connName);
+                        // We need to ensure that we perform the necessary checks again inside alterConnectorOffsetsHerderRequest
+                        // since it is being run in a separate herder request and the conditions could have changed since the
+                        // previous check
+                        addRequest(getAlterConnectorOffsetsCallable(connName, offsets, callback), forwardErrorCallback(callback));
+                    }
+                }).call();
+            } else {
+                getAlterConnectorOffsetsCallable(connName, offsets, callback).call();

Review Comment:
   Same thought here; it's counterintuitive to have a method return a `Callable` just to immediately call it. We can (should) just have synchronous variants instead, and if we still need a way to wrap those in a `Callable`, we can piggyback off of the synchronous one.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) {
         );
     }
 
+    @Override
+    public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
+        log.trace("Submitting alter offsets request for connector '{}'", connName);
+
+        addRequest(() -> {
+            refreshConfigSnapshot(workerSyncTimeoutMs);

Review Comment:
   Shouldn't we handle the case where this method returns `false`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: {}", connName);
+                alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", connName);
+                alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, sinkConnectorConfig,
+                        sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null);
+
+                Admin admin = adminFactory.apply(adminConfig);
+
+                try {
+                    if (!offsetsToAlter.isEmpty()) {
+                        log.debug("Committing the following consumer group topic partition offsets using an admin client for sink connector {}: {}.",
+                                connName, offsetsToAlter);
+                        AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs(
+                                (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                        AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter,
+                                alterConsumerGroupOffsetsOptions);
+
+                        adminFuture = alterConsumerGroupOffsetsResult.all();
+                    }
+
+                    adminFuture.whenComplete((ignored, error) -> {
+                        if (error != null) {
+                            Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                            // When a consumer group is non-empty, only group members can commit offsets. The above attempt to alter offsets via the admin
+                            // client will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped
+                            // completely or if the connector is resumed while the alter offsets request is being processed)
+                            if (error instanceof UnknownMemberIdException) {
+                                cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
+                                                "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
+                                                "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
+                                                "Connect cluster may need to be restarted to get rid of the zombie sink tasks."),
+                                        null);
+                            } else {
+                                cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for topic partitions " + offsetsToAlter.keySet() + " for " +
+                                                "connector " + connName, error),
+                                        null);
+                            }
+                        } else if (!partitionsToReset.isEmpty()) {
+                            log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.",
+                                    connName, partitionsToReset);
+                            DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs(
+                                    (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                            DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset,
+                                    deleteConsumerGroupOffsetsOptions);
+                            deleteConsumerGroupOffsetsResult.all().whenComplete((ignored2, error2) -> {
+                                Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                                if (error2 != null) {
+                                    // The attempt to delete offsets for certain topic partitions via the admin client will result in a
+                                    // GroupSubscribedToTopicException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped completely
+                                    // or if the connector is resumed while the alter offsets request is being processed).
+                                    if (error2 instanceof GroupSubscribedToTopicException) {
+                                        cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
+                                                        "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
+                                                        "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
+                                                        "Connect cluster may need to be restarted to get rid of the zombie sink tasks."),
+                                                null);
+                                    } else {
+                                        cb.onCompletion(new ConnectException("Failed to delete consumer group offsets for topic partitions " + partitionsToReset + " for connector "
+                                                        + connName, error2),
+                                                null);
+                                    }
+                                } else {
+                                    completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                                }
+                            });
+                        } else {
+                            Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                            completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                        }
+                    });
+                } catch (Throwable t) {
+                    Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                    throw t;
+                }
+            } catch (Throwable t) {
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null);
+            }
+        }));
+    }
+
+    /**
+     * Alter a source connector's offsets.
+     *
+     * @param connName the name of the source connector whose offsets are to be altered
+     * @param connector an instance of the source connector
+     * @param connectorConfig the source connector's configuration
+     * @param offsets a mapping from partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    private void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                             Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable());
+        Map<String, Object> producerProps = config.exactlyOnceSourceEnabled()
+                ? exactlyOnceSourceTaskProducerConfigs(new ConnectorTaskId(connName, 0), config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId)
+                : baseProducerConfigs(connName, "connector-offset-producer-" + connName, config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId);
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
+
+        ConnectorOffsetBackingStore offsetStore = config.exactlyOnceSourceEnabled()
+                ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, connName, connector, producer)
+                : offsetStoreForRegularSourceConnector(sourceConfig, connName, connector, producer);
+
+        OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, connName, internalKeyConverter, internalValueConverter);
+        alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, offsetStore, producer, offsetWriter, connectorLoader, cb);
+    }
+
+    // Visible for testing
+    void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                     Map<Map<String, ?>, Map<String, ?>> offsets, ConnectorOffsetBackingStore offsetStore,
+                                     KafkaProducer<byte[], byte[]> producer, OffsetStorageWriter offsetWriter,
+                                     ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, offsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                offsetStore.configure(config);
+                // This reads to the end of the offsets topic and can be a potentially time-consuming operation
+                offsetStore.start();
+
+                // The alterSourceConnectorOffsets method should only be called after all the connector's tasks have been stopped, and it's
+                // safe to write offsets via an offset writer
+                offsets.forEach(offsetWriter::offset);
+
+                // We can call begin flush without a timeout because this newly created single-purpose offset writer can't do concurrent
+                // offset writes. We can also ignore the return value since it returns false if and only if there is no data to be flushed,
+                // and we've just put some data in the previous statement

Review Comment:
   Are we guaranteed that `offsets` is non-empty here?
   
   I guess we do that check in `ConnectorsResource` but it might be nice to reiterate that expectation in the Javadocs for the various offset-alter methods that `offsets` may be neither null nor empty, and possibly even add an explicit check for that.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: {}", connName);
+                alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", connName);
+                alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, sinkConnectorConfig,
+                        sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null);
+
+                Admin admin = adminFactory.apply(adminConfig);
+
+                try {
+                    if (!offsetsToAlter.isEmpty()) {
+                        log.debug("Committing the following consumer group topic partition offsets using an admin client for sink connector {}: {}.",
+                                connName, offsetsToAlter);
+                        AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs(
+                                (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                        AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter,
+                                alterConsumerGroupOffsetsOptions);
+
+                        adminFuture = alterConsumerGroupOffsetsResult.all();
+                    }
+
+                    adminFuture.whenComplete((ignored, error) -> {
+                        if (error != null) {
+                            Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                            // When a consumer group is non-empty, only group members can commit offsets. The above attempt to alter offsets via the admin
+                            // client will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped
+                            // completely or if the connector is resumed while the alter offsets request is being processed)
+                            if (error instanceof UnknownMemberIdException) {
+                                cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
+                                                "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
+                                                "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
+                                                "Connect cluster may need to be restarted to get rid of the zombie sink tasks."),
+                                        null);
+                            } else {
+                                cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for topic partitions " + offsetsToAlter.keySet() + " for " +
+                                                "connector " + connName, error),
+                                        null);
+                            }
+                        } else if (!partitionsToReset.isEmpty()) {
+                            log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.",
+                                    connName, partitionsToReset);
+                            DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs(
+                                    (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                            DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset,
+                                    deleteConsumerGroupOffsetsOptions);
+                            deleteConsumerGroupOffsetsResult.all().whenComplete((ignored2, error2) -> {
+                                Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                                if (error2 != null) {
+                                    // The attempt to delete offsets for certain topic partitions via the admin client will result in a
+                                    // GroupSubscribedToTopicException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped completely
+                                    // or if the connector is resumed while the alter offsets request is being processed).
+                                    if (error2 instanceof GroupSubscribedToTopicException) {
+                                        cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
+                                                        "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
+                                                        "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
+                                                        "Connect cluster may need to be restarted to get rid of the zombie sink tasks."),
+                                                null);
+                                    } else {
+                                        cb.onCompletion(new ConnectException("Failed to delete consumer group offsets for topic partitions " + partitionsToReset + " for connector "
+                                                        + connName, error2),
+                                                null);
+                                    }
+                                } else {
+                                    completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                                }
+                            });
+                        } else {
+                            Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                            completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                        }
+                    });
+                } catch (Throwable t) {
+                    Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                    throw t;
+                }
+            } catch (Throwable t) {
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null);
+            }
+        }));
+    }
+
+    /**
+     * Alter a source connector's offsets.
+     *
+     * @param connName the name of the source connector whose offsets are to be altered
+     * @param connector an instance of the source connector
+     * @param connectorConfig the source connector's configuration
+     * @param offsets a mapping from partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    private void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                             Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable());
+        Map<String, Object> producerProps = config.exactlyOnceSourceEnabled()
+                ? exactlyOnceSourceTaskProducerConfigs(new ConnectorTaskId(connName, 0), config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId)
+                : baseProducerConfigs(connName, "connector-offset-producer-" + connName, config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId);
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
+
+        ConnectorOffsetBackingStore offsetStore = config.exactlyOnceSourceEnabled()
+                ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, connName, connector, producer)
+                : offsetStoreForRegularSourceConnector(sourceConfig, connName, connector, producer);
+
+        OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, connName, internalKeyConverter, internalValueConverter);
+        alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, offsetStore, producer, offsetWriter, connectorLoader, cb);
+    }
+
+    // Visible for testing
+    void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                     Map<Map<String, ?>, Map<String, ?>> offsets, ConnectorOffsetBackingStore offsetStore,
+                                     KafkaProducer<byte[], byte[]> producer, OffsetStorageWriter offsetWriter,
+                                     ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, offsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                offsetStore.configure(config);
+                // This reads to the end of the offsets topic and can be a potentially time-consuming operation
+                offsetStore.start();
+
+                // The alterSourceConnectorOffsets method should only be called after all the connector's tasks have been stopped, and it's
+                // safe to write offsets via an offset writer
+                offsets.forEach(offsetWriter::offset);
+
+                // We can call begin flush without a timeout because this newly created single-purpose offset writer can't do concurrent
+                // offset writes. We can also ignore the return value since it returns false if and only if there is no data to be flushed,
+                // and we've just put some data in the previous statement
+                offsetWriter.beginFlush();
+
+                try {
+                    if (config.exactlyOnceSourceEnabled()) {
+                        producer.initTransactions();
+                        producer.beginTransaction();
+                    }
+                    log.debug("Committing the following partition offsets for source connector {}: {}", connName, offsets);

Review Comment:
   Nit: "partition" doesn't really add anything here. I was considering "source" instead, but that's redundant since we already clarify that it's a source connector.
   ```suggestion
                       log.debug("Committing the following offsets for source connector {}: {}", connName, offsets);
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: {}", connName);
+                alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", connName);
+                alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, sinkConnectorConfig,
+                        sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null);
+
+                Admin admin = adminFactory.apply(adminConfig);
+
+                try {
+                    if (!offsetsToAlter.isEmpty()) {
+                        log.debug("Committing the following consumer group topic partition offsets using an admin client for sink connector {}: {}.",
+                                connName, offsetsToAlter);
+                        AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs(
+                                (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                        AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter,
+                                alterConsumerGroupOffsetsOptions);
+
+                        adminFuture = alterConsumerGroupOffsetsResult.all();
+                    }
+
+                    adminFuture.whenComplete((ignored, error) -> {
+                        if (error != null) {
+                            Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                            // When a consumer group is non-empty, only group members can commit offsets. The above attempt to alter offsets via the admin
+                            // client will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped
+                            // completely or if the connector is resumed while the alter offsets request is being processed)
+                            if (error instanceof UnknownMemberIdException) {
+                                cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
+                                                "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
+                                                "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
+                                                "Connect cluster may need to be restarted to get rid of the zombie sink tasks."),
+                                        null);
+                            } else {
+                                cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for topic partitions " + offsetsToAlter.keySet() + " for " +
+                                                "connector " + connName, error),
+                                        null);
+                            }
+                        } else if (!partitionsToReset.isEmpty()) {
+                            log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.",
+                                    connName, partitionsToReset);
+                            DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs(
+                                    (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                            DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset,
+                                    deleteConsumerGroupOffsetsOptions);
+                            deleteConsumerGroupOffsetsResult.all().whenComplete((ignored2, error2) -> {
+                                Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                                if (error2 != null) {
+                                    // The attempt to delete offsets for certain topic partitions via the admin client will result in a
+                                    // GroupSubscribedToTopicException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped completely
+                                    // or if the connector is resumed while the alter offsets request is being processed).
+                                    if (error2 instanceof GroupSubscribedToTopicException) {
+                                        cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
+                                                        "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
+                                                        "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
+                                                        "Connect cluster may need to be restarted to get rid of the zombie sink tasks."),
+                                                null);
+                                    } else {
+                                        cb.onCompletion(new ConnectException("Failed to delete consumer group offsets for topic partitions " + partitionsToReset + " for connector "
+                                                        + connName, error2),
+                                                null);
+                                    }
+                                } else {
+                                    completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                                }
+                            });
+                        } else {
+                            Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                            completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                        }
+                    });
+                } catch (Throwable t) {
+                    Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                    throw t;
+                }
+            } catch (Throwable t) {
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null);
+            }
+        }));
+    }
+
+    /**
+     * Alter a source connector's offsets.
+     *
+     * @param connName the name of the source connector whose offsets are to be altered
+     * @param connector an instance of the source connector
+     * @param connectorConfig the source connector's configuration
+     * @param offsets a mapping from partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    private void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                             Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable());
+        Map<String, Object> producerProps = config.exactlyOnceSourceEnabled()
+                ? exactlyOnceSourceTaskProducerConfigs(new ConnectorTaskId(connName, 0), config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId)
+                : baseProducerConfigs(connName, "connector-offset-producer-" + connName, config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId);
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
+
+        ConnectorOffsetBackingStore offsetStore = config.exactlyOnceSourceEnabled()
+                ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, connName, connector, producer)
+                : offsetStoreForRegularSourceConnector(sourceConfig, connName, connector, producer);
+
+        OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, connName, internalKeyConverter, internalValueConverter);
+        alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, offsetStore, producer, offsetWriter, connectorLoader, cb);
+    }
+
+    // Visible for testing
+    void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                     Map<Map<String, ?>, Map<String, ?>> offsets, ConnectorOffsetBackingStore offsetStore,
+                                     KafkaProducer<byte[], byte[]> producer, OffsetStorageWriter offsetWriter,
+                                     ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, offsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                offsetStore.configure(config);
+                // This reads to the end of the offsets topic and can be a potentially time-consuming operation
+                offsetStore.start();
+
+                // The alterSourceConnectorOffsets method should only be called after all the connector's tasks have been stopped, and it's
+                // safe to write offsets via an offset writer
+                offsets.forEach(offsetWriter::offset);
+
+                // We can call begin flush without a timeout because this newly created single-purpose offset writer can't do concurrent
+                // offset writes. We can also ignore the return value since it returns false if and only if there is no data to be flushed,
+                // and we've just put some data in the previous statement
+                offsetWriter.beginFlush();
+
+                try {
+                    if (config.exactlyOnceSourceEnabled()) {
+                        producer.initTransactions();
+                        producer.beginTransaction();
+                    }
+                    log.debug("Committing the following partition offsets for source connector {}: {}", connName, offsets);
+                    FutureCallback<Void> offsetWriterCallback = new FutureCallback<>();
+                    offsetWriter.doFlush(offsetWriterCallback);
+                    if (config.exactlyOnceSourceEnabled()) {
+                        producer.commitTransaction();
+                    }
+                    offsetWriterCallback.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+                } catch (ExecutionException e) {
+                    throw new ConnectException("Failed to alter offsets for source connector " + connName, e.getCause());

Review Comment:
   We include the set of to-be-altered partitions in the error messages for sink connectors. Can/should we do that for source connectors as well?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) {
         );
     }
 
+    @Override
+    public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
+        log.trace("Submitting alter offsets request for connector '{}'", connName);
+
+        addRequest(() -> {
+            refreshConfigSnapshot(workerSyncTimeoutMs);

Review Comment:
   Also, is it wise to force a read to the end of the config topic when we may not even be the leader of the cluster? Feels like we should do that check before the read-to-end.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) {
         );
     }
 
+    @Override
+    public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
+        log.trace("Submitting alter offsets request for connector '{}'", connName);
+
+        addRequest(() -> {
+            refreshConfigSnapshot(workerSyncTimeoutMs);
+            if (!alterConnectorOffsetsChecks(connName, callback)) {
+                return null;
+            }
+            // At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run
+            // a zombie fencing request
+            if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) {
+                log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once semantics enabled.", connName);
+                getFenceZombieSourceTasksCallable(connName, (error, ignored) -> {
+                    if (error != null) {
+                        log.error("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", error);
+                        callback.onCompletion(new ConnectException("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets",
+                                error), null);
+                    } else {
+                        log.debug("Successfully completed zombie fencing for source connector {}; proceeding to alter offsets.", connName);
+                        // We need to ensure that we perform the necessary checks again inside alterConnectorOffsetsHerderRequest

Review Comment:
   Does `alterConnectorOffsetsHerderRequest` need to be updated? This is the only place in the class where that name is used.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be overwritten

Review Comment:
   Nits:
   
   1. Might help to be explicit about what "partitions" refer to here
   2. We aren't necessarily overwriting here (it's possible that no offset exists for one or more of the specified partitions)
   
   ```suggestion
        * @param offsets  a mapping from partitions (either source partitions, or Kafka topic partitions) to offsets that need to be written
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) {
         );
     }
 
+    @Override
+    public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
+        log.trace("Submitting alter offsets request for connector '{}'", connName);
+
+        addRequest(() -> {
+            refreshConfigSnapshot(workerSyncTimeoutMs);
+            if (!alterConnectorOffsetsChecks(connName, callback)) {
+                return null;
+            }
+            // At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run
+            // a zombie fencing request
+            if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) {
+                log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once semantics enabled.", connName);

Review Comment:
   Nit: we say "support" instead of "semantics" for source connectors in Kafka Connect since it's still possible for connectors to not be exactly-once if they don't define meaningful source offsets.
   ```suggestion
                   log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once support enabled.", connName);
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) {
         );
     }
 
+    @Override
+    public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
+        log.trace("Submitting alter offsets request for connector '{}'", connName);
+
+        addRequest(() -> {
+            refreshConfigSnapshot(workerSyncTimeoutMs);
+            if (!alterConnectorOffsetsChecks(connName, callback)) {
+                return null;
+            }
+            // At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run
+            // a zombie fencing request
+            if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) {
+                log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once semantics enabled.", connName);
+                getFenceZombieSourceTasksCallable(connName, (error, ignored) -> {
+                    if (error != null) {
+                        log.error("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", error);
+                        callback.onCompletion(new ConnectException("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets",
+                                error), null);
+                    } else {
+                        log.debug("Successfully completed zombie fencing for source connector {}; proceeding to alter offsets.", connName);
+                        // We need to ensure that we perform the necessary checks again inside alterConnectorOffsetsHerderRequest
+                        // since it is being run in a separate herder request and the conditions could have changed since the

Review Comment:
   It's not that it's being run in a separate herder request, it's that zombie fencing takes place asynchronously (i.e., off of the herder tick thread).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) {
         );
     }
 
+    @Override
+    public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
+        log.trace("Submitting alter offsets request for connector '{}'", connName);
+
+        addRequest(() -> {
+            refreshConfigSnapshot(workerSyncTimeoutMs);
+            if (!alterConnectorOffsetsChecks(connName, callback)) {
+                return null;
+            }
+            // At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run
+            // a zombie fencing request
+            if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) {
+                log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once semantics enabled.", connName);
+                getFenceZombieSourceTasksCallable(connName, (error, ignored) -> {
+                    if (error != null) {
+                        log.error("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", error);
+                        callback.onCompletion(new ConnectException("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets",
+                                error), null);
+                    } else {
+                        log.debug("Successfully completed zombie fencing for source connector {}; proceeding to alter offsets.", connName);
+                        // We need to ensure that we perform the necessary checks again inside alterConnectorOffsetsHerderRequest
+                        // since it is being run in a separate herder request and the conditions could have changed since the
+                        // previous check
+                        addRequest(getAlterConnectorOffsetsCallable(connName, offsets, callback), forwardErrorCallback(callback));
+                    }
+                }).call();

Review Comment:
   This is a little odd... could we have a method to synchronously do a round of zombie fencing and invoke that directly here (moving the callback logic to after that invocation)?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: {}", connName);
+                alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", connName);
+                alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);

Review Comment:
   We just created a new instance of this class to pass to `adminConfigs`; can we save that one and use it both there and here?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) {
         );
     }
 
+    @Override
+    public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
+        log.trace("Submitting alter offsets request for connector '{}'", connName);
+
+        addRequest(() -> {
+            refreshConfigSnapshot(workerSyncTimeoutMs);
+            if (!alterConnectorOffsetsChecks(connName, callback)) {
+                return null;
+            }
+            // At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run
+            // a zombie fencing request
+            if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) {
+                log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once semantics enabled.", connName);
+                getFenceZombieSourceTasksCallable(connName, (error, ignored) -> {
+                    if (error != null) {
+                        log.error("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", error);
+                        callback.onCompletion(new ConnectException("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets",
+                                error), null);
+                    } else {
+                        log.debug("Successfully completed zombie fencing for source connector {}; proceeding to alter offsets.", connName);
+                        // We need to ensure that we perform the necessary checks again inside alterConnectorOffsetsHerderRequest
+                        // since it is being run in a separate herder request and the conditions could have changed since the
+                        // previous check
+                        addRequest(getAlterConnectorOffsetsCallable(connName, offsets, callback), forwardErrorCallback(callback));
+                    }
+                }).call();
+            } else {
+                getAlterConnectorOffsetsCallable(connName, offsets, callback).call();
+            }
+            return null;
+        }, forwardErrorCallback(callback));
+    }
+
+    private Callable<Void> getAlterConnectorOffsetsCallable(String connName, Map<Map<String, ?>, Map<String, ?>> offsets,
+                                                            Callback<Message> callback) {
+        return () -> {
+            if (!alterConnectorOffsetsChecks(connName, callback)) {
+                return null;
+            }
+            worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback);
+            return null;
+        };
+    }
+
+    /**
+     * This method performs a few checks for alter connector offsets request and completes the callback exceptionally
+     * if any check fails.
+     * @param connName the name of the connector whose offsets are to be altered
+     * @param callback callback to invoke upon completion
+     * @return true if all the checks passed, false otherwise
+     */
+    private boolean alterConnectorOffsetsChecks(String connName, Callback<Message> callback) {
+        if (checkRebalanceNeeded(callback)) {
+            return false;
+        }
+        if (!configState.contains(connName)) {
+            callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
+            return false;
+        }
+        // If the target state for the connector is stopped, its task count is 0, and there is no rebalance pending (checked above),
+        // we can be sure that the tasks have at least been attempted to be stopped (or cancelled if they took too long to stop).
+        // Zombie tasks are handled by a round of zombie fencing for exactly once source connectors. Zombie sink tasks are handled
+        // naturally because requests to alter consumer group offsets will fail if there are still active members in the group.
+        if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) {
+            callback.onCompletion(new BadRequestException("The connector needs to be stopped before its offsets can be altered"), null);
+            return false;
+        }
+        if (!isLeader()) {
+            callback.onCompletion(new NotLeaderException("Only the leader can process alter offsets requests", leaderUrl()), null);
+            return false;
+        }

Review Comment:
   Can we move this to the beginning of the method (or maybe just after the `checkRebalanceNeeded` call)? If we're going to forward to the leader, best to do it sooner rather than later to reduce duplicated work.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java:
##########
@@ -52,4 +53,82 @@ public static ConnectorOffsets consumerGroupOffsetsToConnectorOffsets(Map<TopicP
 
         return new ConnectorOffsets(connectorOffsets);
     }
+
+    /**
+     * Validate that the provided partitions (keys in the {@code partitionOffsets} map) look like:
+     * <pre>
+     *     {
+     *       "kafka_topic": "topic"
+     *       "kafka_partition": 3
+     *     }
+     * </pre>
+     *
+     * and that the provided offsets (values in the {@code partitionOffsets} map) look like:
+     * <pre>
+     *     {
+     *       "kafka_offset": 1000
+     *     }
+     * </pre>
+     *
+     * This method then parses them into a mapping from {@link TopicPartition}s to their corresponding {@link Long}
+     * valued offsets.
+     *
+     * @param partitionOffsets the partitions to offset map that needs to be validated and parsed.
+     * @return the parsed mapping from {@link TopicPartition} to its corresponding {@link Long} valued offset.
+     *
+     * @throws BadRequestException if the provided offsets aren't in the expected format
+     */
+    public static Map<TopicPartition, Long> validateAndParseSinkConnectorOffsets(Map<Map<String, ?>, Map<String, ?>> partitionOffsets) {

Review Comment:
   Validation is an implied part of parsing (think `Long::parseLong`, `Integer::parseInt`, etc.).
   
   I think we can just name this `parseSinkConnectorOffsets` and leave out mention of validation from the Javadoc except for the `@throws` clause.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: {}", connName);
+                alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", connName);
+                alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, sinkConnectorConfig,
+                        sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Review Comment:
   This can be simplified:
   
   ```suggestion
                           .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())));
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: {}", connName);
+                alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", connName);
+                alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, sinkConnectorConfig,
+                        sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null);

Review Comment:
   Nit: This can be inside the scope of the `try` block starting at line 1343.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: {}", connName);
+                alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", connName);
+                alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, sinkConnectorConfig,
+                        sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null);
+
+                Admin admin = adminFactory.apply(adminConfig);
+
+                try {
+                    if (!offsetsToAlter.isEmpty()) {
+                        log.debug("Committing the following consumer group topic partition offsets using an admin client for sink connector {}: {}.",

Review Comment:
   Nit: "topic partition" is redundant if we're already talking about "consumer offset"
   ```suggestion
                           log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.",
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: {}", connName);
+                alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", connName);
+                alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, sinkConnectorConfig,
+                        sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null);
+
+                Admin admin = adminFactory.apply(adminConfig);
+
+                try {
+                    if (!offsetsToAlter.isEmpty()) {
+                        log.debug("Committing the following consumer group topic partition offsets using an admin client for sink connector {}: {}.",
+                                connName, offsetsToAlter);
+                        AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs(
+                                (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                        AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter,
+                                alterConsumerGroupOffsetsOptions);
+
+                        adminFuture = alterConsumerGroupOffsetsResult.all();
+                    }
+
+                    adminFuture.whenComplete((ignored, error) -> {
+                        if (error != null) {
+                            Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                            // When a consumer group is non-empty, only group members can commit offsets. The above attempt to alter offsets via the admin
+                            // client will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped
+                            // completely or if the connector is resumed while the alter offsets request is being processed)
+                            if (error instanceof UnknownMemberIdException) {
+                                cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
+                                                "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
+                                                "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
+                                                "Connect cluster may need to be restarted to get rid of the zombie sink tasks."),
+                                        null);
+                            } else {
+                                cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for topic partitions " + offsetsToAlter.keySet() + " for " +
+                                                "connector " + connName, error),
+                                        null);
+                            }
+                        } else if (!partitionsToReset.isEmpty()) {
+                            log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.",
+                                    connName, partitionsToReset);
+                            DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs(
+                                    (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                            DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset,
+                                    deleteConsumerGroupOffsetsOptions);
+                            deleteConsumerGroupOffsetsResult.all().whenComplete((ignored2, error2) -> {
+                                Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                                if (error2 != null) {
+                                    // The attempt to delete offsets for certain topic partitions via the admin client will result in a
+                                    // GroupSubscribedToTopicException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped completely
+                                    // or if the connector is resumed while the alter offsets request is being processed).
+                                    if (error2 instanceof GroupSubscribedToTopicException) {
+                                        cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
+                                                        "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
+                                                        "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
+                                                        "Connect cluster may need to be restarted to get rid of the zombie sink tasks."),
+                                                null);
+                                    } else {
+                                        cb.onCompletion(new ConnectException("Failed to delete consumer group offsets for topic partitions " + partitionsToReset + " for connector "
+                                                        + connName, error2),
+                                                null);
+                                    }
+                                } else {
+                                    completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                                }
+                            });
+                        } else {
+                            Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                            completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                        }
+                    });
+                } catch (Throwable t) {
+                    Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                    throw t;
+                }
+            } catch (Throwable t) {
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null);
+            }
+        }));
+    }
+
+    /**
+     * Alter a source connector's offsets.
+     *
+     * @param connName the name of the source connector whose offsets are to be altered
+     * @param connector an instance of the source connector
+     * @param connectorConfig the source connector's configuration
+     * @param offsets a mapping from partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    private void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                             Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable());
+        Map<String, Object> producerProps = config.exactlyOnceSourceEnabled()
+                ? exactlyOnceSourceTaskProducerConfigs(new ConnectorTaskId(connName, 0), config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId)
+                : baseProducerConfigs(connName, "connector-offset-producer-" + connName, config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId);
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
+
+        ConnectorOffsetBackingStore offsetStore = config.exactlyOnceSourceEnabled()
+                ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, connName, connector, producer)
+                : offsetStoreForRegularSourceConnector(sourceConfig, connName, connector, producer);
+
+        OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, connName, internalKeyConverter, internalValueConverter);
+        alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, offsetStore, producer, offsetWriter, connectorLoader, cb);
+    }
+
+    // Visible for testing
+    void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                     Map<Map<String, ?>, Map<String, ?>> offsets, ConnectorOffsetBackingStore offsetStore,
+                                     KafkaProducer<byte[], byte[]> producer, OffsetStorageWriter offsetWriter,
+                                     ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, offsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                offsetStore.configure(config);
+                // This reads to the end of the offsets topic and can be a potentially time-consuming operation
+                offsetStore.start();

Review Comment:
   Doesn't this potentially leak the offset store if an exception occurs before we reach the `try` (and accompanying `finally`) block at line 1471?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: {}", connName);
+                alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", connName);
+                alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, sinkConnectorConfig,
+                        sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null);
+
+                Admin admin = adminFactory.apply(adminConfig);
+
+                try {
+                    if (!offsetsToAlter.isEmpty()) {
+                        log.debug("Committing the following consumer group topic partition offsets using an admin client for sink connector {}: {}.",
+                                connName, offsetsToAlter);
+                        AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs(
+                                (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                        AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter,
+                                alterConsumerGroupOffsetsOptions);
+
+                        adminFuture = alterConsumerGroupOffsetsResult.all();
+                    }
+
+                    adminFuture.whenComplete((ignored, error) -> {
+                        if (error != null) {
+                            Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);

Review Comment:
   We close the admin client in three different `if/else if/else` branches in this lambda body, which is verbose (not a huge deal) and brittle (bigger deal).
   
   If we can combine everything into a single future using the `KafkaFuture::allOf` approach, then we can put the admin cleanup into a `whenComplete` call for that future, which will be a bit cleaner.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: {}", connName);
+                alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", connName);
+                alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, sinkConnectorConfig,
+                        sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null);
+
+                Admin admin = adminFactory.apply(adminConfig);
+
+                try {
+                    if (!offsetsToAlter.isEmpty()) {
+                        log.debug("Committing the following consumer group topic partition offsets using an admin client for sink connector {}: {}.",
+                                connName, offsetsToAlter);
+                        AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs(
+                                (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                        AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter,
+                                alterConsumerGroupOffsetsOptions);
+
+                        adminFuture = alterConsumerGroupOffsetsResult.all();
+                    }
+
+                    adminFuture.whenComplete((ignored, error) -> {
+                        if (error != null) {
+                            Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                            // When a consumer group is non-empty, only group members can commit offsets. The above attempt to alter offsets via the admin
+                            // client will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped
+                            // completely or if the connector is resumed while the alter offsets request is being processed)
+                            if (error instanceof UnknownMemberIdException) {
+                                cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
+                                                "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
+                                                "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
+                                                "Connect cluster may need to be restarted to get rid of the zombie sink tasks."),
+                                        null);
+                            } else {
+                                cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for topic partitions " + offsetsToAlter.keySet() + " for " +
+                                                "connector " + connName, error),
+                                        null);
+                            }
+                        } else if (!partitionsToReset.isEmpty()) {
+                            log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.",
+                                    connName, partitionsToReset);
+                            DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs(
+                                    (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                            DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset,
+                                    deleteConsumerGroupOffsetsOptions);
+                            deleteConsumerGroupOffsetsResult.all().whenComplete((ignored2, error2) -> {
+                                Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                                if (error2 != null) {
+                                    // The attempt to delete offsets for certain topic partitions via the admin client will result in a
+                                    // GroupSubscribedToTopicException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped completely
+                                    // or if the connector is resumed while the alter offsets request is being processed).
+                                    if (error2 instanceof GroupSubscribedToTopicException) {
+                                        cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
+                                                        "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
+                                                        "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
+                                                        "Connect cluster may need to be restarted to get rid of the zombie sink tasks."),
+                                                null);
+                                    } else {
+                                        cb.onCompletion(new ConnectException("Failed to delete consumer group offsets for topic partitions " + partitionsToReset + " for connector "
+                                                        + connName, error2),
+                                                null);
+                                    }
+                                } else {
+                                    completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                                }
+                            });
+                        } else {
+                            Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                            completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                        }
+                    });
+                } catch (Throwable t) {
+                    Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                    throw t;
+                }
+            } catch (Throwable t) {
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null);
+            }
+        }));
+    }
+
+    /**
+     * Alter a source connector's offsets.
+     *
+     * @param connName the name of the source connector whose offsets are to be altered
+     * @param connector an instance of the source connector
+     * @param connectorConfig the source connector's configuration
+     * @param offsets a mapping from partitions to offsets that need to be overwritten

Review Comment:
   Same nit RE "overwritten":
   ```suggestion
        * @param offsets a mapping from partitions to offsets that need to be written
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: {}", connName);
+                alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", connName);
+                alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, sinkConnectorConfig,
+                        sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null);
+
+                Admin admin = adminFactory.apply(adminConfig);
+
+                try {
+                    if (!offsetsToAlter.isEmpty()) {
+                        log.debug("Committing the following consumer group topic partition offsets using an admin client for sink connector {}: {}.",
+                                connName, offsetsToAlter);
+                        AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs(
+                                (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                        AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter,
+                                alterConsumerGroupOffsetsOptions);
+
+                        adminFuture = alterConsumerGroupOffsetsResult.all();
+                    }
+
+                    adminFuture.whenComplete((ignored, error) -> {
+                        if (error != null) {
+                            Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                            // When a consumer group is non-empty, only group members can commit offsets. The above attempt to alter offsets via the admin
+                            // client will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped
+                            // completely or if the connector is resumed while the alter offsets request is being processed)
+                            if (error instanceof UnknownMemberIdException) {
+                                cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
+                                                "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
+                                                "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
+                                                "Connect cluster may need to be restarted to get rid of the zombie sink tasks."),
+                                        null);
+                            } else {
+                                cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for topic partitions " + offsetsToAlter.keySet() + " for " +
+                                                "connector " + connName, error),
+                                        null);
+                            }
+                        } else if (!partitionsToReset.isEmpty()) {
+                            log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.",
+                                    connName, partitionsToReset);
+                            DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs(
+                                    (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                            DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset,
+                                    deleteConsumerGroupOffsetsOptions);
+                            deleteConsumerGroupOffsetsResult.all().whenComplete((ignored2, error2) -> {
+                                Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                                if (error2 != null) {
+                                    // The attempt to delete offsets for certain topic partitions via the admin client will result in a
+                                    // GroupSubscribedToTopicException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped completely
+                                    // or if the connector is resumed while the alter offsets request is being processed).
+                                    if (error2 instanceof GroupSubscribedToTopicException) {
+                                        cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
+                                                        "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
+                                                        "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
+                                                        "Connect cluster may need to be restarted to get rid of the zombie sink tasks."),
+                                                null);
+                                    } else {
+                                        cb.onCompletion(new ConnectException("Failed to delete consumer group offsets for topic partitions " + partitionsToReset + " for connector "
+                                                        + connName, error2),
+                                                null);
+                                    }
+                                } else {
+                                    completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                                }
+                            });
+                        } else {
+                            Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                            completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                        }
+                    });
+                } catch (Throwable t) {
+                    Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                    throw t;
+                }
+            } catch (Throwable t) {
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null);
+            }
+        }));
+    }
+
+    /**
+     * Alter a source connector's offsets.
+     *
+     * @param connName the name of the source connector whose offsets are to be altered
+     * @param connector an instance of the source connector
+     * @param connectorConfig the source connector's configuration
+     * @param offsets a mapping from partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    private void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                             Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable());
+        Map<String, Object> producerProps = config.exactlyOnceSourceEnabled()
+                ? exactlyOnceSourceTaskProducerConfigs(new ConnectorTaskId(connName, 0), config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId)
+                : baseProducerConfigs(connName, "connector-offset-producer-" + connName, config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId);
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
+
+        ConnectorOffsetBackingStore offsetStore = config.exactlyOnceSourceEnabled()
+                ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, connName, connector, producer)
+                : offsetStoreForRegularSourceConnector(sourceConfig, connName, connector, producer);
+
+        OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, connName, internalKeyConverter, internalValueConverter);
+        alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, offsetStore, producer, offsetWriter, connectorLoader, cb);
+    }
+
+    // Visible for testing
+    void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                     Map<Map<String, ?>, Map<String, ?>> offsets, ConnectorOffsetBackingStore offsetStore,
+                                     KafkaProducer<byte[], byte[]> producer, OffsetStorageWriter offsetWriter,
+                                     ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, offsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                offsetStore.configure(config);
+                // This reads to the end of the offsets topic and can be a potentially time-consuming operation
+                offsetStore.start();
+
+                // The alterSourceConnectorOffsets method should only be called after all the connector's tasks have been stopped, and it's
+                // safe to write offsets via an offset writer
+                offsets.forEach(offsetWriter::offset);
+
+                // We can call begin flush without a timeout because this newly created single-purpose offset writer can't do concurrent
+                // offset writes. We can also ignore the return value since it returns false if and only if there is no data to be flushed,
+                // and we've just put some data in the previous statement
+                offsetWriter.beginFlush();
+
+                try {
+                    if (config.exactlyOnceSourceEnabled()) {
+                        producer.initTransactions();
+                        producer.beginTransaction();
+                    }
+                    log.debug("Committing the following partition offsets for source connector {}: {}", connName, offsets);
+                    FutureCallback<Void> offsetWriterCallback = new FutureCallback<>();
+                    offsetWriter.doFlush(offsetWriterCallback);
+                    if (config.exactlyOnceSourceEnabled()) {
+                        producer.commitTransaction();
+                    }
+                    offsetWriterCallback.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+                } catch (ExecutionException e) {
+                    throw new ConnectException("Failed to alter offsets for source connector " + connName, e.getCause());
+                } catch (TimeoutException e) {
+                    throw new ConnectException("Timed out while attempting to alter offsets for source connector " + connName, e);
+                } catch (InterruptedException e) {
+                    throw new ConnectException("Unexpectedly interrupted while attempting to alter offsets for source connector " + connName, e);
+                } finally {
+                    offsetStore.stop();
+                }
+
+                completeAlterOffsetsCallback(alterOffsetsResult, cb);
+            } catch (Throwable t) {
+                log.error("Failed to alter offsets for source connector {}", connName, t);
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for source connector " + connName), null);
+            }
+        }));
+    }
+
+    private void completeAlterOffsetsCallback(boolean alterOffsetsResult, Callback<Message> cb) {
+        if (alterOffsetsResult) {
+            cb.onCompletion(null, new Message("The offsets for this connector have been altered successfully"));
+        } else {
+            cb.onCompletion(null, new Message("The Connect framework managed offsets for this connector have been " +

Review Comment:
   Nit: hyphen (like the KIP uses)
   ```suggestion
               cb.onCompletion(null, new Message("The Connect framework-managed offsets for this connector have been " +
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: {}", connName);
+                alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", connName);
+                alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, sinkConnectorConfig,
+                        sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null);
+
+                Admin admin = adminFactory.apply(adminConfig);
+
+                try {
+                    if (!offsetsToAlter.isEmpty()) {
+                        log.debug("Committing the following consumer group topic partition offsets using an admin client for sink connector {}: {}.",
+                                connName, offsetsToAlter);
+                        AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs(
+                                (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                        AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter,
+                                alterConsumerGroupOffsetsOptions);
+
+                        adminFuture = alterConsumerGroupOffsetsResult.all();
+                    }
+
+                    adminFuture.whenComplete((ignored, error) -> {
+                        if (error != null) {
+                            Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                            // When a consumer group is non-empty, only group members can commit offsets. The above attempt to alter offsets via the admin
+                            // client will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped
+                            // completely or if the connector is resumed while the alter offsets request is being processed)
+                            if (error instanceof UnknownMemberIdException) {
+                                cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
+                                                "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
+                                                "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
+                                                "Connect cluster may need to be restarted to get rid of the zombie sink tasks."),
+                                        null);
+                            } else {
+                                cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for topic partitions " + offsetsToAlter.keySet() + " for " +
+                                                "connector " + connName, error),
+                                        null);
+                            }
+                        } else if (!partitionsToReset.isEmpty()) {
+                            log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.",
+                                    connName, partitionsToReset);
+                            DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs(
+                                    (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                            DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset,
+                                    deleteConsumerGroupOffsetsOptions);
+                            deleteConsumerGroupOffsetsResult.all().whenComplete((ignored2, error2) -> {
+                                Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                                if (error2 != null) {
+                                    // The attempt to delete offsets for certain topic partitions via the admin client will result in a
+                                    // GroupSubscribedToTopicException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped completely
+                                    // or if the connector is resumed while the alter offsets request is being processed).
+                                    if (error2 instanceof GroupSubscribedToTopicException) {
+                                        cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
+                                                        "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
+                                                        "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
+                                                        "Connect cluster may need to be restarted to get rid of the zombie sink tasks."),
+                                                null);
+                                    } else {
+                                        cb.onCompletion(new ConnectException("Failed to delete consumer group offsets for topic partitions " + partitionsToReset + " for connector "
+                                                        + connName, error2),
+                                                null);
+                                    }
+                                } else {
+                                    completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                                }
+                            });
+                        } else {
+                            Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                            completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                        }
+                    });
+                } catch (Throwable t) {
+                    Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
+                    throw t;
+                }
+            } catch (Throwable t) {
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null);
+            }
+        }));
+    }
+
+    /**
+     * Alter a source connector's offsets.
+     *
+     * @param connName the name of the source connector whose offsets are to be altered
+     * @param connector an instance of the source connector
+     * @param connectorConfig the source connector's configuration
+     * @param offsets a mapping from partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    private void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                             Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable());
+        Map<String, Object> producerProps = config.exactlyOnceSourceEnabled()
+                ? exactlyOnceSourceTaskProducerConfigs(new ConnectorTaskId(connName, 0), config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId)
+                : baseProducerConfigs(connName, "connector-offset-producer-" + connName, config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId);
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
+
+        ConnectorOffsetBackingStore offsetStore = config.exactlyOnceSourceEnabled()
+                ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, connName, connector, producer)
+                : offsetStoreForRegularSourceConnector(sourceConfig, connName, connector, producer);
+
+        OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, connName, internalKeyConverter, internalValueConverter);
+        alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, offsetStore, producer, offsetWriter, connectorLoader, cb);
+    }
+
+    // Visible for testing
+    void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                     Map<Map<String, ?>, Map<String, ?>> offsets, ConnectorOffsetBackingStore offsetStore,
+                                     KafkaProducer<byte[], byte[]> producer, OffsetStorageWriter offsetWriter,
+                                     ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, offsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                offsetStore.configure(config);
+                // This reads to the end of the offsets topic and can be a potentially time-consuming operation
+                offsetStore.start();
+
+                // The alterSourceConnectorOffsets method should only be called after all the connector's tasks have been stopped, and it's
+                // safe to write offsets via an offset writer
+                offsets.forEach(offsetWriter::offset);
+
+                // We can call begin flush without a timeout because this newly created single-purpose offset writer can't do concurrent
+                // offset writes. We can also ignore the return value since it returns false if and only if there is no data to be flushed,
+                // and we've just put some data in the previous statement
+                offsetWriter.beginFlush();
+
+                try {
+                    if (config.exactlyOnceSourceEnabled()) {
+                        producer.initTransactions();
+                        producer.beginTransaction();
+                    }
+                    log.debug("Committing the following partition offsets for source connector {}: {}", connName, offsets);
+                    FutureCallback<Void> offsetWriterCallback = new FutureCallback<>();
+                    offsetWriter.doFlush(offsetWriterCallback);
+                    if (config.exactlyOnceSourceEnabled()) {
+                        producer.commitTransaction();
+                    }
+                    offsetWriterCallback.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);

Review Comment:
   If the connector uses a separate offsets topic, this doesn't guarantee that the request to alter offsets has successfully completed on the worker's global offsets topic (see [here](https://github.com/apache/kafka/blob/228434d23583189cdcaa7f4a90ebb178ccc17c73/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java#L288-L289)).
   
   I think we may have to:
   - Detect when a connector-specific offsets store is necessary, and if it is:
   - - Construct just that store (without the secondary global offsets store)
   - - Alter the offsets for the connector using that store
   - Then, regardless of whether a connector-specific store was necessary, alter the offsets for the connector using the worker's global offsets store



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1512,11 +1773,25 @@ public WorkerTask doBuild(Task task,
         }
     }
 
-    // Visible for testing
+    /**
+     * Builds and returns an offset backing store for a regular source connector (i.e. when exactly-once support for source connectors is disabled).
+     * The offset backing store will either be just the worker's global offset backing store (if the connector doesn't define a connector-specific
+     * offset topic via its configs), just the connector-specific offset backing store (if the connector defines a connector-specific offsets
+     * topic which happens to be the same as the worker's global offset topic) or a combination of both the worker's global offset backing store

Review Comment:
   Nit: the way we determine this is susceptible to edge cases where the topics aren't the same
   ```suggestion
        * topic which appears to be the same as the worker's global offset topic) or a combination of both the worker's global offset backing store
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1576,11 +1854,24 @@ ConnectorOffsetBackingStore offsetStoreForRegularSourceConnector(
         }
     }
 
-    // Visible for testing
+    /**
+     * Builds and returns an offset backing store for an exactly-once source connector. The offset backing store will either be just
+     * the connector-specific offset backing store (if the connector defines a connector-specific offsets topic which happens to be

Review Comment:
   The connector doesn't have to define a custom offsets topic, in which case, we also use just a connector-specific store.
   ```suggestion
        * the connector-specific offset backing store (if the connector's offsets topic is
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: {}", connName);
+                alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", connName);
+                alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, sinkConnectorConfig,
+                        sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null);
+
+                Admin admin = adminFactory.apply(adminConfig);
+
+                try {

Review Comment:
   I like the idea of performing both operations simultaneously and combining them with `KafkaFuture::allOf`. This optimizes for the happy-path case and the tradeoff if one of the operations fails isn't significant; the user knows that failed calls to the offset reset API can leaves the offsets for a connector in an inconsistent state, and can always check on the latest ones via the offset read API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org