You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2017/04/03 20:09:31 UTC

kafka git commit: KAFKA-4878: Improved Invalid Connect Config Error Message

Repository: kafka
Updated Branches:
  refs/heads/trunk 800d29648 -> 040fde8ec


KAFKA-4878: Improved Invalid Connect Config Error Message

Addresses for https://issues.apache.org/jira/browse/KAFKA-4878

* Adjusted the error message to explicitly state errors and their number
* Dried up the logic for generating the message between standalone and distributed

Example

messed up two config keys in the file source config:
````
namse=local-file-source
connector.class=FileStreamSource
tasks.max=1
fisle=test.txt
topic=connect-test
```

Produces:

```
[2017-03-22 08:57:11,896] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:99)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Missing required configuration "file" which has no default value.
Missing required configuration "name" which has no default value.
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
```

Author: Armin Braun <me...@obrown.io>

Reviewers: Gwen Shapira, Konstantine Karantasis, Ewen Cheslack-Postava

Closes #2722 from original-brownbear/KAFKA-4878


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/040fde8e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/040fde8e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/040fde8e

Branch: refs/heads/trunk
Commit: 040fde8ec1f94856799c6bb47fa23a73c20bda66
Parents: 800d296
Author: Armin Braun <me...@obrown.io>
Authored: Mon Apr 3 13:09:10 2017 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Apr 3 13:09:10 2017 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/AbstractHerder.java   | 36 +++++++++++++++++
 .../runtime/distributed/DistributedHerder.java  |  7 +---
 .../runtime/standalone/StandaloneHerder.java    |  8 +---
 .../standalone/StandaloneHerderTest.java        | 41 ++++++++++++++++++++
 4 files changed, 80 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/040fde8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 6a16185..9e5342e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -27,11 +27,13 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
 import java.io.ByteArrayOutputStream;
@@ -338,6 +340,40 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
         }
     }
 
+    /**
+     * Checks a given {@link ConfigInfos} for validation error messages and adds an exception
+     * to the given {@link Callback} if any were found.
+     *
+     * @param configInfos configInfos to read Errors from
+     * @param callback callback to add config error exception to
+     * @return true if errors were found in the config
+     */
+    protected final boolean maybeAddConfigErrors(
+        ConfigInfos configInfos,
+        Callback<Created<ConnectorInfo>> callback
+    ) {
+        int errors = configInfos.errorCount();
+        boolean hasErrors = errors > 0;
+        if (hasErrors) {
+            StringBuilder messages = new StringBuilder();
+            messages.append("Connector configuration is invalid and contains the following ")
+                .append(errors).append(" error(s):");
+            for (ConfigInfo configInfo : configInfos.values()) {
+                for (String msg : configInfo.configValue().errors()) {
+                    messages.append('\n').append(msg);
+                }
+            }
+            callback.onCompletion(
+                new BadRequestException(
+                    messages.append(
+                        "\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
+                    ).toString()
+                ), null
+            );
+        }
+        return hasErrors;
+    }
+
     private String trace(Throwable t) {
         ByteArrayOutputStream output = new ByteArrayOutputStream();
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/040fde8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index a76fd03..cf30aca 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -35,10 +35,8 @@ import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.RestServer;
-import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
-import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
@@ -507,10 +505,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                 new Callable<Void>() {
                     @Override
                     public Void call() throws Exception {
-                        ConfigInfos validatedConfig = validateConnectorConfig(config);
-                        if (validatedConfig.errorCount() > 0) {
-                            callback.onCompletion(new BadRequestException("Connector configuration is invalid " +
-                                    "(use the endpoint `/{connectorType}/config/validate` to get a full list of errors)"), null);
+                        if (maybeAddConfigErrors(validateConnectorConfig(config), callback)) {
                             return null;
                         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/040fde8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index ca0130e..9c8c7ae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -27,10 +27,8 @@ import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
-import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
-import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.MemoryConfigBackingStore;
 import org.apache.kafka.connect.storage.MemoryStatusBackingStore;
@@ -45,6 +43,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+
 /**
  * Single process, in-memory "herder". Useful for a standalone Kafka Connect process.
  */
@@ -155,10 +154,7 @@ public class StandaloneHerder extends AbstractHerder {
                                                 boolean allowReplace,
                                                 final Callback<Created<ConnectorInfo>> callback) {
         try {
-            ConfigInfos validatedConfig = validateConnectorConfig(config);
-            if (validatedConfig.errorCount() > 0) {
-                callback.onCompletion(new BadRequestException("Connector configuration is invalid " +
-                        "(use the endpoint `/{connectorType}/config/validate` to get a full list of errors)"), null);
+            if (maybeAddConfigErrors(validateConnectorConfig(config), callback)) {
                 return;
             }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/040fde8e/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 04d14b5..da1edbc 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -56,6 +56,7 @@ import org.powermock.api.easymock.PowerMock;
 import org.powermock.api.easymock.annotation.Mock;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -484,6 +485,46 @@ public class StandaloneHerderTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testCorruptConfig() {
+        Map<String, String> config = new HashMap<>();
+        config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSinkConnector.class.getName());
+        Connector connectorMock = PowerMock.createMock(Connector.class);
+        String error = "This is an error in your config!";
+        List<String> errors = new ArrayList<>(singletonList(error));
+        String key = "foo.invalid.key";
+        EasyMock.expect(connectorMock.validate(config)).andReturn(
+            new Config(
+                Arrays.asList(new ConfigValue(key, null, Collections.emptyList(), errors))
+            )
+        );
+        ConfigDef configDef = new ConfigDef();
+        configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "");
+        EasyMock.expect(connectorMock.config()).andStubReturn(configDef);
+        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
+        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
+        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString()))
+            .andReturn(connectorMock);
+        Callback<Herder.Created<ConnectorInfo>> callback = PowerMock.createMock(Callback.class);
+        Capture<BadRequestException> capture = Capture.newInstance();
+        callback.onCompletion(
+            EasyMock.capture(capture), EasyMock.isNull(Herder.Created.class)
+        );
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, config, true, callback);
+        assertEquals(
+            capture.getValue().getMessage(),
+            "Connector configuration is invalid and contains the following 1 error(s):\n" +
+                error + "\n" +
+                "You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
+        );
+
+        PowerMock.verifyAll();
+    }
+
     private void expectAdd(SourceSink sourceSink) throws Exception {
 
         Map<String, String> connectorProps = connectorConfig(sourceSink);