You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2017/04/03 20:09:31 UTC
kafka git commit: KAFKA-4878: Improved Invalid Connect Config Error
Message
Repository: kafka
Updated Branches:
refs/heads/trunk 800d29648 -> 040fde8ec
KAFKA-4878: Improved Invalid Connect Config Error Message
Addresses for https://issues.apache.org/jira/browse/KAFKA-4878
* Adjusted the error message to explicitly state errors and their number
* Dried up the logic for generating the message between standalone and distributed
Example
messed up two config keys in the file source config:
````
namse=local-file-source
connector.class=FileStreamSource
tasks.max=1
fisle=test.txt
topic=connect-test
```
Produces:
```
[2017-03-22 08:57:11,896] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:99)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Missing required configuration "file" which has no default value.
Missing required configuration "name" which has no default value.
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
```
Author: Armin Braun <me...@obrown.io>
Reviewers: Gwen Shapira, Konstantine Karantasis, Ewen Cheslack-Postava
Closes #2722 from original-brownbear/KAFKA-4878
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/040fde8e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/040fde8e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/040fde8e
Branch: refs/heads/trunk
Commit: 040fde8ec1f94856799c6bb47fa23a73c20bda66
Parents: 800d296
Author: Armin Braun <me...@obrown.io>
Authored: Mon Apr 3 13:09:10 2017 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Apr 3 13:09:10 2017 -0700
----------------------------------------------------------------------
.../kafka/connect/runtime/AbstractHerder.java | 36 +++++++++++++++++
.../runtime/distributed/DistributedHerder.java | 7 +---
.../runtime/standalone/StandaloneHerder.java | 8 +---
.../standalone/StandaloneHerderTest.java | 41 ++++++++++++++++++++
4 files changed, 80 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/040fde8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 6a16185..9e5342e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -27,11 +27,13 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import java.io.ByteArrayOutputStream;
@@ -338,6 +340,40 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
}
}
+ /**
+ * Checks a given {@link ConfigInfos} for validation error messages and adds an exception
+ * to the given {@link Callback} if any were found.
+ *
+ * @param configInfos configInfos to read Errors from
+ * @param callback callback to add config error exception to
+ * @return true if errors were found in the config
+ */
+ protected final boolean maybeAddConfigErrors(
+ ConfigInfos configInfos,
+ Callback<Created<ConnectorInfo>> callback
+ ) {
+ int errors = configInfos.errorCount();
+ boolean hasErrors = errors > 0;
+ if (hasErrors) {
+ StringBuilder messages = new StringBuilder();
+ messages.append("Connector configuration is invalid and contains the following ")
+ .append(errors).append(" error(s):");
+ for (ConfigInfo configInfo : configInfos.values()) {
+ for (String msg : configInfo.configValue().errors()) {
+ messages.append('\n').append(msg);
+ }
+ }
+ callback.onCompletion(
+ new BadRequestException(
+ messages.append(
+ "\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
+ ).toString()
+ ), null
+ );
+ }
+ return hasErrors;
+ }
+
private String trace(Throwable t) {
ByteArrayOutputStream output = new ByteArrayOutputStream();
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/040fde8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index a76fd03..cf30aca 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -35,10 +35,8 @@ import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.RestServer;
-import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
-import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
@@ -507,10 +505,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
new Callable<Void>() {
@Override
public Void call() throws Exception {
- ConfigInfos validatedConfig = validateConnectorConfig(config);
- if (validatedConfig.errorCount() > 0) {
- callback.onCompletion(new BadRequestException("Connector configuration is invalid " +
- "(use the endpoint `/{connectorType}/config/validate` to get a full list of errors)"), null);
+ if (maybeAddConfigErrors(validateConnectorConfig(config), callback)) {
return null;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/040fde8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index ca0130e..9c8c7ae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -27,10 +27,8 @@ import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
-import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
-import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.MemoryConfigBackingStore;
import org.apache.kafka.connect.storage.MemoryStatusBackingStore;
@@ -45,6 +43,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
+
/**
* Single process, in-memory "herder". Useful for a standalone Kafka Connect process.
*/
@@ -155,10 +154,7 @@ public class StandaloneHerder extends AbstractHerder {
boolean allowReplace,
final Callback<Created<ConnectorInfo>> callback) {
try {
- ConfigInfos validatedConfig = validateConnectorConfig(config);
- if (validatedConfig.errorCount() > 0) {
- callback.onCompletion(new BadRequestException("Connector configuration is invalid " +
- "(use the endpoint `/{connectorType}/config/validate` to get a full list of errors)"), null);
+ if (maybeAddConfigErrors(validateConnectorConfig(config), callback)) {
return;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/040fde8e/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 04d14b5..da1edbc 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -56,6 +56,7 @@ import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -484,6 +485,46 @@ public class StandaloneHerderTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testCorruptConfig() {
+ Map<String, String> config = new HashMap<>();
+ config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSinkConnector.class.getName());
+ Connector connectorMock = PowerMock.createMock(Connector.class);
+ String error = "This is an error in your config!";
+ List<String> errors = new ArrayList<>(singletonList(error));
+ String key = "foo.invalid.key";
+ EasyMock.expect(connectorMock.validate(config)).andReturn(
+ new Config(
+ Arrays.asList(new ConfigValue(key, null, Collections.emptyList(), errors))
+ )
+ );
+ ConfigDef configDef = new ConfigDef();
+ configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "");
+ EasyMock.expect(connectorMock.config()).andStubReturn(configDef);
+ ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
+ EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
+ EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString()))
+ .andReturn(connectorMock);
+ Callback<Herder.Created<ConnectorInfo>> callback = PowerMock.createMock(Callback.class);
+ Capture<BadRequestException> capture = Capture.newInstance();
+ callback.onCompletion(
+ EasyMock.capture(capture), EasyMock.isNull(Herder.Created.class)
+ );
+
+ PowerMock.replayAll();
+
+ herder.putConnectorConfig(CONNECTOR_NAME, config, true, callback);
+ assertEquals(
+ capture.getValue().getMessage(),
+ "Connector configuration is invalid and contains the following 1 error(s):\n" +
+ error + "\n" +
+ "You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
+ );
+
+ PowerMock.verifyAll();
+ }
+
private void expectAdd(SourceSink sourceSink) throws Exception {
Map<String, String> connectorProps = connectorConfig(sourceSink);