You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/01 21:53:06 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #11776: KAFKA-10000: Add new preflight connector config validation logic (KIP-618)

C0urante commented on code in PR #11776:
URL: https://github.com/apache/kafka/pull/11776#discussion_r887332195


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -842,21 +846,138 @@ public void deleteConnectorConfig(final String connName, final Callback<Created<
     }
 
     @Override
-    protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector connector,
-                                                                    ConfigDef configDef,
-                                                                    Map<String, String> config) {
-        Map<String, ConfigValue> validatedConfig = super.validateBasicConnectorConfig(connector, configDef, config);
-        if (connector instanceof SinkConnector) {
-            ConfigValue validatedName = validatedConfig.get(ConnectorConfig.NAME_CONFIG);
-            String name = (String) validatedName.value();
-            if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
-                validatedName.addErrorMessage("Consumer group for sink connector named " + name +
-                        " conflicts with Connect worker group " + workerGroupId);
+    protected Map<String, ConfigValue> validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef, Map<String, String> config) {
+        Map<String, ConfigValue> result = super.validateSinkConnectorConfig(connector, configDef, config);
+        validateSinkConnectorGroupId(result);
+        return result;
+    }
+
+    @Override
+    protected Map<String, ConfigValue> validateSourceConnectorConfig(SourceConnector connector, ConfigDef configDef, Map<String, String> config) {
+        Map<String, ConfigValue> result = super.validateSourceConnectorConfig(connector, configDef, config);
+        validateSourceConnectorExactlyOnceSupport(config, result, connector);
+        validateSourceConnectorTransactionBoundary(config, result, connector);
+        return result;
+    }
+
+
+    private void validateSinkConnectorGroupId(Map<String, ConfigValue> validatedConfig) {
+        ConfigValue validatedName = validatedConfig.get(ConnectorConfig.NAME_CONFIG);
+        String name = (String) validatedName.value();

Review Comment:
   Yes; every framework-level connector property should have a `ConfigValue` entry in the map here.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -842,21 +846,138 @@ public void deleteConnectorConfig(final String connName, final Callback<Created<
     }
 
     @Override
-    protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector connector,
-                                                                    ConfigDef configDef,
-                                                                    Map<String, String> config) {
-        Map<String, ConfigValue> validatedConfig = super.validateBasicConnectorConfig(connector, configDef, config);
-        if (connector instanceof SinkConnector) {
-            ConfigValue validatedName = validatedConfig.get(ConnectorConfig.NAME_CONFIG);
-            String name = (String) validatedName.value();
-            if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
-                validatedName.addErrorMessage("Consumer group for sink connector named " + name +
-                        " conflicts with Connect worker group " + workerGroupId);
+    protected Map<String, ConfigValue> validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef, Map<String, String> config) {
+        Map<String, ConfigValue> result = super.validateSinkConnectorConfig(connector, configDef, config);
+        validateSinkConnectorGroupId(result);
+        return result;
+    }
+
+    @Override
+    protected Map<String, ConfigValue> validateSourceConnectorConfig(SourceConnector connector, ConfigDef configDef, Map<String, String> config) {
+        Map<String, ConfigValue> result = super.validateSourceConnectorConfig(connector, configDef, config);
+        validateSourceConnectorExactlyOnceSupport(config, result, connector);
+        validateSourceConnectorTransactionBoundary(config, result, connector);
+        return result;
+    }
+
+
+    private void validateSinkConnectorGroupId(Map<String, ConfigValue> validatedConfig) {
+        ConfigValue validatedName = validatedConfig.get(ConnectorConfig.NAME_CONFIG);
+        String name = (String) validatedName.value();
+        if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
+            validatedName.addErrorMessage("Consumer group for sink connector named " + name +
+                    " conflicts with Connect worker group " + workerGroupId);
+        }
+    }
+
+    private void validateSourceConnectorExactlyOnceSupport(
+            Map<String, String> rawConfig,
+            Map<String, ConfigValue> validatedConfig,
+            SourceConnector connector) {
+        ConfigValue validatedExactlyOnceSupport = validatedConfig.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG);
+        if (validatedExactlyOnceSupport.errorMessages().isEmpty()) {
+            // Should be safe to parse the enum from the user-provided value since it's passed validation so far
+            SourceConnectorConfig.ExactlyOnceSupportLevel exactlyOnceSupportLevel =
+                    SourceConnectorConfig.ExactlyOnceSupportLevel.fromProperty(Objects.toString(validatedExactlyOnceSupport.value()));
+            if (SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.equals(exactlyOnceSupportLevel)) {
+                if (!config.exactlyOnceSourceEnabled()) {
+                    validatedExactlyOnceSupport.addErrorMessage("This worker does not have exactly-once source support enabled.");
+                }
+
+                try {
+                    ExactlyOnceSupport exactlyOnceSupport = connector.exactlyOnceSupport(rawConfig);
+                    if (!ExactlyOnceSupport.SUPPORTED.equals(exactlyOnceSupport)) {
+                        final String validationErrorMessage;
+                        // Would do a switch here but that doesn't permit matching on null values
+                        if (exactlyOnceSupport == null) {
+                            validationErrorMessage = "The connector does not implement the API required for preflight validation of exactly-once "
+                                    + "source support. Please consult the documentation for the connector to determine whether it supports exactly-once "
+                                    + "guarantees, and then consider reconfiguring the connector to use the value \""
+                                    + SourceConnectorConfig.ExactlyOnceSupportLevel.REQUESTED
+                                    + "\" for this property (which will disable this preflight check and allow the connector to be created).";
+                        } else if (ExactlyOnceSupport.UNSUPPORTED.equals(exactlyOnceSupport)) {
+                            validationErrorMessage = "The connector does not support exactly-once delivery guarantees with the provided configuration.";
+                        } else {
+                            throw new ConnectException("Unexpected value returned from SourceConnector::exactlyOnceSupport: " + exactlyOnceSupport);
+                        }
+                        validatedExactlyOnceSupport.addErrorMessage(validationErrorMessage);
+                    }
+                } catch (Exception e) {
+                    log.error("Failed while validating connector support for exactly-once guarantees", e);
+                    String validationErrorMessage = "An unexpected error occurred during validation";
+                    String failureMessage = e.getMessage();
+                    if (failureMessage != null && !failureMessage.trim().isEmpty()) {
+                        validationErrorMessage += ": " + failureMessage.trim();
+                    } else {
+                        validationErrorMessage += "; please see the worker logs for more details.";
+                    }
+                    validatedExactlyOnceSupport.addErrorMessage(validationErrorMessage);
+                }
             }
         }
-        return validatedConfig;
     }
 
+    private void validateSourceConnectorTransactionBoundary(
+            Map<String, String> rawConfig,
+            Map<String, ConfigValue> validatedConfig,
+            SourceConnector connector) {
+        ConfigValue validatedTransactionBoundary = validatedConfig.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG);
+        if (validatedTransactionBoundary.errorMessages().isEmpty()) {
+            // Should be safe to parse the enum from the user-provided value since it's passed validation so far
+            SourceTask.TransactionBoundary transactionBoundary =
+                    SourceTask.TransactionBoundary.fromProperty(Objects.toString(validatedTransactionBoundary.value()));
+            if (SourceTask.TransactionBoundary.CONNECTOR.equals(transactionBoundary)) {
+                try {
+                    ConnectorTransactionBoundaries connectorTransactionSupport = connector.canDefineTransactionBoundaries(rawConfig);
+                    if (connectorTransactionSupport == null) {
+                        validatedTransactionBoundary.addErrorMessage(
+                                "This connector has returned a null value from its canDefineTransactionBoundaries method, which is not permitted. " +
+                                        "The connector will be treated as if it cannot define its own transaction boundaries, and cannot be configured with " +
+                                        "'" + SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG + "' set to '" + SourceTask.TransactionBoundary.CONNECTOR + "'."
+                        );
+                    } else if (!ConnectorTransactionBoundaries.SUPPORTED.equals(connectorTransactionSupport)) {
+                        validatedTransactionBoundary.addErrorMessage(
+                                "The connector does not support connector-defined transaction boundaries with the given configuration. "
+                                        + "Please reconfigure it to use a different transaction boundary definition.");
+                    }
+                } catch (Exception e) {
+                    log.error("Failed while validating connector support for defining its own transaction boundaries", e);
+                    String validationErrorMessage = "An unexpected error occurred during validation";
+                    String failureMessage = e.getMessage();
+                    if (failureMessage != null && !failureMessage.trim().isEmpty()) {
+                        validationErrorMessage += ": " + failureMessage.trim();
+                    } else {
+                        validationErrorMessage += "; please see the worker logs for more details.";
+                    }
+                    validatedTransactionBoundary.addErrorMessage(validationErrorMessage);
+                }
+            }
+        }
+    }
+
+    @Override
+    protected boolean connectorUsesAdmin(org.apache.kafka.connect.health.ConnectorType connectorType, Map<String, String> connProps) {
+        if (super.connectorUsesAdmin(connectorType, connProps)) {
+            return true;
+        } else if (connectorType == org.apache.kafka.connect.health.ConnectorType.SOURCE) {
+            return config.exactlyOnceSourceEnabled()
+                || !connProps.getOrDefault(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "").trim().isEmpty();
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    protected boolean connectorUsesConsumer(org.apache.kafka.connect.health.ConnectorType connectorType, Map<String, String> connProps) {
+        if (super.connectorUsesConsumer(connectorType, connProps)) {
+            return true;
+        } else if (connectorType == org.apache.kafka.connect.health.ConnectorType.SOURCE) {
+            return config.exactlyOnceSourceEnabled()
+                || !connProps.getOrDefault(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "").trim().isEmpty();
+        } else {
+            return false;
+        }

Review Comment:
   👍 done.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -790,22 +794,231 @@ public void testCreateConnectorFailedValidation() throws Exception {
         PowerMock.verifyAll();
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testConnectorNameConflictsWithWorkerGroupId() {
         Map<String, String> config = new HashMap<>(CONN2_CONFIG);
         config.put(ConnectorConfig.NAME_CONFIG, "test-group");
 
-        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        SinkConnector connectorMock = PowerMock.createMock(SinkConnector.class);
+
+        PowerMock.replayAll(connectorMock);
 
         // CONN2 creation should fail because the worker group id (connect-test-group) conflicts with
         // the consumer group id we would use for this sink
-        Map<String, ConfigValue> validatedConfigs =
-            herder.validateBasicConnectorConfig(connectorMock, ConnectorConfig.configDef(), config);
+        Map<String, ConfigValue> validatedConfigs = herder.validateSinkConnectorConfig(
+                connectorMock, SinkConnectorConfig.configDef(), config);
 
         ConfigValue nameConfig = validatedConfigs.get(ConnectorConfig.NAME_CONFIG);
-        assertNotNull(nameConfig.errorMessages());
         assertFalse(nameConfig.errorMessages().isEmpty());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testExactlyOnceSourceSupportValidation() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, REQUIRED.toString());
+
+        SourceConnector connectorMock = PowerMock.createMock(SourceConnector.class);
+        EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+                .andReturn(ExactlyOnceSupport.SUPPORTED);
+
+        PowerMock.replayAll(connectorMock);
+
+        Map<String, ConfigValue> validatedConfigs = herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        ConfigValue exactlyOnceSupportConfig = validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG);
+        assertTrue(exactlyOnceSupportConfig.errorMessages().isEmpty());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testExactlyOnceSourceSupportValidationOnUnsupportedConnector() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, REQUIRED.toString());
+
+        SourceConnector connectorMock = PowerMock.createMock(SourceConnector.class);
+        EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+                .andReturn(ExactlyOnceSupport.UNSUPPORTED);
+
+        PowerMock.replayAll(connectorMock);
+
+        Map<String, ConfigValue> validatedConfigs = herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        ConfigValue exactlyOnceSupportConfig = validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG);
+        assertFalse(exactlyOnceSupportConfig.errorMessages().isEmpty());

Review Comment:
   Ack, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org