You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2021/02/04 00:12:31 UTC

[kafka] branch 2.7 updated: KAFKA-12259: Use raw config to infer the connector type when returning a connector status response (#10040)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 6044498  KAFKA-12259: Use raw config to infer the connector type when returning a connector status response (#10040)
6044498 is described below

commit 6044498775620c73a6a9fd75e29cbb9e4a009df3
Author: Yilong Chang <yi...@confluent.io>
AuthorDate: Wed Feb 3 15:39:32 2021 -0800

    KAFKA-12259: Use raw config to infer the connector type when returning a connector status response (#10040)
    
    Problem: when calling the connect status endpoint, a 500 error is returned, e.g.
    ```
    {
      "error_code": 500,
      "message": "Could not read properties from file /tmp/somefile.properties"
    }
    ```
    when any of the connectors has an exception from the config provider. This is because the `AbstractHerder` is trying to use the resolved config to infer the type of the connector. However, only the `connector.class` is needed from the config to infer if a specific connector is of source or sink type. The endpoint should still return the status of the connector instead of a 500 error.
    
    This change uses the raw config from the config backing store to retrieve the connector class to avoid any variable resolution.
    
    Unit tests have been updated to reflect this change.
    
    Reviewers: Konstantine Karantasis <k....@gmail.com>
---
 .../main/java/org/apache/kafka/connect/runtime/AbstractHerder.java  | 6 +++---
 .../apache/kafka/connect/runtime/distributed/DistributedHerder.java | 4 ++--
 .../apache/kafka/connect/runtime/standalone/StandaloneHerder.java   | 4 ++--
 .../java/org/apache/kafka/connect/runtime/AbstractHerderTest.java   | 6 +++---
 4 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index aa348f2..b73bba9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -229,9 +229,9 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     }
 
     /*
-     * Retrieves config map by connector name
+     * Retrieves raw config map by connector name.
      */
-    protected abstract Map<String, String> config(String connName);
+    protected abstract Map<String, String> rawConfig(String connName);
 
     @Override
     public Collection<String> connectors() {
@@ -273,7 +273,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
 
         Collections.sort(taskStates);
 
-        Map<String, String> conf = config(connName);
+        Map<String, String> conf = rawConfig(connName);
         return new ConnectorStateInfo(connName, connectorState, taskStates,
             conf == null ? ConnectorType.UNKNOWN : connectorTypeForClass(conf.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
     }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 266427e..4879a1a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -746,8 +746,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     }
 
     @Override
-    protected Map<String, String> config(String connName) {
-        return configState.connectorConfig(connName);
+    protected Map<String, String> rawConfig(String connName) {
+        return configState.rawConnectorConfig(connName);
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index db218e0..c6a1b7f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -149,8 +149,8 @@ public class StandaloneHerder extends AbstractHerder {
     }
 
     @Override
-    protected synchronized Map<String, String> config(String connName) {
-        return configState.connectorConfig(connName);
+    protected synchronized Map<String, String> rawConfig(String connName) {
+        return configState.rawConnectorConfig(connName);
     }
 
     @Override
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 3943f9d..1a3e09b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -158,7 +158,7 @@ public class AbstractHerderTest {
             .createMock();
 
         EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.config(connector)).andReturn(null);
+        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
         EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT);
         replayAll();
         assertEquals(Collections.singleton(CONN1), new HashSet<>(herder.connectors()));
@@ -182,7 +182,7 @@ public class AbstractHerderTest {
             .createMock();
 
         EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.config(connector)).andReturn(null);
+        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
         EasyMock.expect(statusStore.get(connector))
             .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
         EasyMock.expect(statusStore.getAll(connector))
@@ -206,7 +206,7 @@ public class AbstractHerderTest {
                 .createMock();
 
         EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.config(connector)).andReturn(null);
+        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
 
         EasyMock.expect(statusStore.get(connector))
                 .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));