You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "yashmayya (via GitHub)" <gi...@apache.org> on 2023/04/05 07:03:49 UTC

[GitHub] [kafka] yashmayya commented on a diff in pull request #13424: KAFKA-14783 (KIP-875): New STOPPED state for connectors

yashmayya commented on code in PR #13424:
URL: https://github.com/apache/kafka/pull/13424#discussion_r1158101868


##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##########
@@ -553,6 +570,54 @@ public ActiveTopicsInfo connectorTopics(String connectorName) {
                 "Could not read connector state. Error response: " + responseToString(response));
     }
 
+    /**
+     * Get the info of a connector running in this cluster (retrieved via the <code>GET /connectors/{connector}</code> endpoint).
+
+     * @param connectorName name of the connector
+     * @return an instance of {@link ConnectorInfo} populated with state information of the connector and its tasks.
+     */
+    public ConnectorInfo connectorInfo(String connectorName) {
+        ObjectMapper mapper = new ObjectMapper();
+        String url = endpointForResource(String.format("connectors/%s", connectorName));
+        Response response = requestGet(url);
+        try {
+            if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
+                return mapper.readValue(responseToString(response), ConnectorInfo.class);
+            }
+        } catch (IOException e) {
+            log.error("Could not read connector info from response: {}",
+                    responseToString(response), e);
+            throw new ConnectException("Could not not parse connector info", e);
+        }
+        throw new ConnectRestException(response.getStatus(),
+                "Could not read connector info. Error response: " + responseToString(response));
+    }
+
+    /**
+     * Get the task configs of a connector running in this cluster.
+
+     * @param connectorName name of the connector
+     * @return a map from task ID (connector name + "-" + task number) to task config
+     */
+    public Map<String, Map<String, String>> taskConfigs(String connectorName) {
+        ObjectMapper mapper = new ObjectMapper();
+        String url = endpointForResource(String.format("connectors/%s/tasks-config", connectorName));

Review Comment:
   >  I also wonder if GET /connectors/{connector}/tasks-config should avoid returning the raw configs to avoid leaking values returned by providers.
   
   Yeah that's a good point as well - it's possible that with certain config providers, Connect operators won't be able to access the raw secrets directly but will have access to the worker's REST API (and the worker obviously will have access to the secrets from the provider) right? 
   
   Also, if we choose to make that change (I assume it'll require a KIP because it's changing the response of a public REST API), the two APIs would essentially become identical. Would it be possible to deprecate / remove one of the APIs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org