You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/03/23 15:40:02 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #13434: KAFKA-14785: Connect offset read REST API

C0urante commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1146391973


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1139,106 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector.
+     * @param connName the name of the connector whose offsets are to be retrieved
+     * @param connectorConfig the connector's configurations
+     * @return the connector's offsets
+     */
+    public ConnectorOffsets connectorOffsets(String connName, Map<String, String> connectorConfig) {
+        String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
+            connector = plugins.newConnector(connectorClassOrAlias);
+        }
+
+        if (ConnectUtils.isSinkConnector(connector)) {
+            log.debug("Fetching offsets for sink connector: {}", connName);
+            return sinkConnectorOffsets(connName, connector, connectorConfig);
+        } else {
+            log.debug("Fetching offsets for source connector: {}", connName);
+            return sourceConnectorOffsets(connName, connector, connectorConfig);
+        }
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @return the consumer group offsets for the sink connector
+     */
+    private ConnectorOffsets sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig) {
+        return sinkConnectorOffsets(connName, connector, connectorConfig, Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    ConnectorOffsets sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
+                                          Function<Map<String, Object>, Admin> adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SOURCE);
+        String groupId = (String) baseConsumerConfigs(
+                connName, "connector-consumer-", config, new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+        Admin admin = adminFactory.apply(adminConfig);
+        try {
+            ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = admin.listConsumerGroupOffsets(groupId);

Review Comment:
   I don't think we need to worry too much about this. I cannot imagine a single sane use case that involves overriding a connector's Kafka clients with different Kafka clusters (not just bootstrap servers, but actually different clusters) for producer/consumer/admin. I'd be fine with adding a note to our docs that that kind of setup isn't supported but I really, really hope that it's not necessary and nobody's trying to do that in the first place.
   
   That said, there is a different case we may want to consider: someone may have configured consumer overrides for a sink connector, but not admin overrides. This may happen if they don't use a DLQ topic. I don't know if we absolutely need to handle this now and we may consider filing a follow-up ticket to look into this, but one quick-and-dirty thought I've had is to configure the admin client used here with a combination of the configurations for the connector's admin client and its consumer, giving precedent to the latter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org