You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/05/10 01:28:13 UTC

[kafka] branch trunk updated: KAFKA-8231: Expansion of ConnectClusterState interface (#6584)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 36a5aba  KAFKA-8231: Expansion of ConnectClusterState interface (#6584)
36a5aba is described below

commit 36a5aba4ecf56631372ad12d5c67af8fa3de05f8
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Thu May 9 18:27:59 2019 -0700

    KAFKA-8231: Expansion of ConnectClusterState interface (#6584)
    
    Expand ConnectClusterState interface and implementation with methods that provide the immutable cluster details and the connector configuration. This includes unit tests for the new methods.
    
    Author: Chris Egerton <ce...@oberlin.edu>
    Reviews: Arjun Satish <ar...@confluent.io>, Konstantine Karantasis <ko...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
 .../connect/health/ConnectClusterDetails.java      | 32 ++++++++++++++++++++
 .../kafka/connect/health/ConnectClusterState.java  | 30 +++++++++++++++++--
 .../runtime/health/ConnectClusterDetailsImpl.java  | 34 +++++++++++++++++++++
 .../runtime/health/ConnectClusterStateImpl.java    | 29 ++++++++++++++++--
 .../kafka/connect/runtime/rest/RestServer.java     |  8 ++++-
 .../health/ConnectClusterStateImplTest.java        | 35 ++++++++++++++++++++--
 .../kafka/connect/runtime/rest/RestServerTest.java |  6 +++-
 7 files changed, 165 insertions(+), 9 deletions(-)

diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterDetails.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterDetails.java
new file mode 100644
index 0000000..edde6ff
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterDetails.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.health;
+
+/**
+ * Provides immutable Connect cluster information, such as the ID of the backing Kafka cluster. The
+ * Connect framework provides the implementation for this interface.
+ */
+public interface ConnectClusterDetails {
+
+    /**
+     * Get the cluster ID of the Kafka cluster backing this Connect cluster.
+     * 
+     * @return the cluster ID of the Kafka cluster backing this Connect cluster
+     **/
+    String kafkaClusterId();
+}
\ No newline at end of file
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java
index d4292ef..753ee1a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java
@@ -18,10 +18,13 @@
 package org.apache.kafka.connect.health;
 
 import java.util.Collection;
+import java.util.Map;
 
 /**
- * Provides the ability to lookup connector metadata and its health. This is made available to the {@link org.apache.kafka.connect.rest.ConnectRestExtension}
- * implementations. The Connect framework provides the implementation for this interface.
+ * Provides the ability to lookup connector metadata, including status and configurations, as well
+ * as immutable cluster information such as Kafka cluster ID. This is made available to
+ * {@link org.apache.kafka.connect.rest.ConnectRestExtension} implementations. The Connect framework
+ * provides the implementation for this interface.
  */
 public interface ConnectClusterState {
 
@@ -43,4 +46,27 @@ public interface ConnectClusterState {
      * @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found
      */
     ConnectorHealth connectorHealth(String connName);
+
+    /**
+     * Lookup the current configuration of a connector. This provides the current snapshot of configuration by querying the underlying
+     * herder. A connector returned by previous invocation of {@link #connectors()} may no longer be available and could result in {@link
+     * org.apache.kafka.connect.errors.NotFoundException}.
+     *
+     * @param connName name of the connector
+     * @return the configuration of the connector for the connector name
+     * @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found
+     * @throws java.lang.UnsupportedOperationException if the default implementation has not been overridden
+     */
+    default Map<String, String> connectorConfig(String connName) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Get details about the setup of the Connect cluster.
+     * @return a {@link ConnectClusterDetails} object containing information about the cluster
+     * @throws java.lang.UnsupportedOperationException if the default implementation has not been overridden
+     **/
+    default ConnectClusterDetails clusterDetails() {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterDetailsImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterDetailsImpl.java
new file mode 100644
index 0000000..09f09bd
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterDetailsImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.health;
+
+import org.apache.kafka.connect.health.ConnectClusterDetails;
+
+public class ConnectClusterDetailsImpl implements ConnectClusterDetails {
+
+    private final String kafkaClusterId;
+
+    public ConnectClusterDetailsImpl(String kafkaClusterId) {
+        this.kafkaClusterId = kafkaClusterId;
+    }
+
+    @Override
+    public String kafkaClusterId() {
+        return kafkaClusterId;
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
index e3a4833..c3e950e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.connect.runtime.health;
 
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.health.ConnectClusterDetails;
 import org.apache.kafka.connect.health.ConnectClusterState;
 import org.apache.kafka.connect.health.ConnectorHealth;
 import org.apache.kafka.connect.health.ConnectorState;
@@ -38,10 +39,16 @@ import java.util.concurrent.TimeoutException;
 public class ConnectClusterStateImpl implements ConnectClusterState {
     
     private final long herderRequestTimeoutMs;
+    private final ConnectClusterDetails clusterDetails;
     private final Herder herder;
 
-    public ConnectClusterStateImpl(long connectorsTimeoutMs, Herder herder) {
+    public ConnectClusterStateImpl(
+        long connectorsTimeoutMs,
+        ConnectClusterDetails clusterDetails,
+        Herder herder
+    ) {
         this.herderRequestTimeoutMs = connectorsTimeoutMs;
+        this.clusterDetails = clusterDetails;
         this.herder = herder;
     }
 
@@ -58,7 +65,6 @@ public class ConnectClusterStateImpl implements ConnectClusterState {
 
     @Override
     public ConnectorHealth connectorHealth(String connName) {
-
         ConnectorStateInfo state = herder.connectorStatus(connName);
         ConnectorState connectorState = new ConnectorState(
             state.connector().state(),
@@ -75,6 +81,25 @@ public class ConnectClusterStateImpl implements ConnectClusterState {
         return connectorHealth;
     }
 
+    @Override
+    public Map<String, String> connectorConfig(String connName) {
+        FutureCallback<Map<String, String>> connectorConfigCallback = new FutureCallback<>();
+        herder.connectorConfig(connName, connectorConfigCallback);
+        try {
+            return connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            throw new ConnectException(
+                String.format("Failed to retrieve configuration for connector '%s'", connName),
+                e
+            );
+        }
+    }
+
+    @Override
+    public ConnectClusterDetails clusterDetails() {
+        return clusterDetails;
+    }
+
     private Map<Integer, TaskState> taskStates(List<ConnectorStateInfo.TaskState> states) {
 
         Map<Integer, TaskState> taskStates = new HashMap<>();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 067fd88..d76cfff 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -19,10 +19,12 @@ package org.apache.kafka.connect.runtime.rest;
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.health.ConnectClusterDetails;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.health.ConnectClusterDetailsImpl;
 import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
@@ -328,10 +330,14 @@ public class RestServer {
             herderRequestTimeoutMs = Math.min(herderRequestTimeoutMs, rebalanceTimeoutMs.longValue());
         }
 
+        ConnectClusterDetails connectClusterDetails = new ConnectClusterDetailsImpl(
+            herder.kafkaClusterId()
+        );
+
         ConnectRestExtensionContext connectRestExtensionContext =
             new ConnectRestExtensionContextImpl(
                 new ConnectRestConfigurable(resourceConfig),
-                new ConnectClusterStateImpl(herderRequestTimeoutMs, herder)
+                new ConnectClusterStateImpl(herderRequestTimeoutMs, connectClusterDetails, herder)
             );
         for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
             connectRestExtension.register(connectRestExtensionContext);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
index 4752075..d8984f0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
@@ -30,6 +30,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -38,17 +40,22 @@ import static org.junit.Assert.assertThrows;
 
 @RunWith(PowerMockRunner.class)
 public class ConnectClusterStateImplTest {
-  
+    protected static final String KAFKA_CLUSTER_ID = "franzwashere";
+
     @Mock
     protected Herder herder;
     protected ConnectClusterStateImpl connectClusterState;
-    protected Collection<String> expectedConnectors;
     protected long herderRequestTimeoutMs = TimeUnit.SECONDS.toMillis(10);
+    protected Collection<String> expectedConnectors;
     
     @Before
     public void setUp() {
-        connectClusterState = new ConnectClusterStateImpl(herderRequestTimeoutMs, herder);
         expectedConnectors = Arrays.asList("sink1", "source1", "source2");
+        connectClusterState = new ConnectClusterStateImpl(
+            herderRequestTimeoutMs,
+            new ConnectClusterDetailsImpl(KAFKA_CLUSTER_ID),
+            herder
+        );
     }
     
     @Test
@@ -67,6 +74,28 @@ public class ConnectClusterStateImplTest {
     }
 
     @Test
+    public void connectorConfig() {
+        final String connName = "sink6";
+        final Map<String, String> expectedConfig = Collections.singletonMap("key", "value");
+        Capture<Callback<Map<String, String>>> callback = EasyMock.newCapture();
+        herder.connectorConfig(EasyMock.eq(connName), EasyMock.capture(callback));
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() {
+                callback.getValue().onCompletion(null, expectedConfig);
+                return null;
+            }
+        });
+        EasyMock.replay(herder);
+        assertEquals(expectedConfig, connectClusterState.connectorConfig(connName));
+    }
+
+    @Test
+    public void kafkaClusterId() {
+        assertEquals(KAFKA_CLUSTER_ID, connectClusterState.clusterDetails().kafkaClusterId());
+    }
+
+    @Test
     public void connectorsFailure() {
         Capture<Callback<Collection<String>>> callback = EasyMock.newCapture();
         herder.connectors(EasyMock.capture(callback));
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 3d297b7..91aa5e7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -62,6 +62,8 @@ public class RestServerTest {
     private Plugins plugins;
     private RestServer server;
 
+    protected static final String KAFKA_CLUSTER_ID = "Xbafgnagvar";
+
     @After
     public void tearDown() {
         server.stop();
@@ -166,6 +168,7 @@ public class RestServerTest {
         Map<String, String> configMap = new HashMap<>(baseWorkerProps());
         DistributedConfig workerConfig = new DistributedConfig(configMap);
 
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
         EasyMock.expect(herder.plugins()).andStubReturn(plugins);
         EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
             workerConfig,
@@ -202,6 +205,7 @@ public class RestServerTest {
         workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
         WorkerConfig workerConfig = new DistributedConfig(workerProps);
 
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
         EasyMock.expect(herder.plugins()).andStubReturn(plugins);
         EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
                                            workerConfig,
@@ -260,7 +264,7 @@ public class RestServerTest {
         workerProps.put("offset.storage.file.filename", "/tmp");
         WorkerConfig workerConfig = new StandaloneConfig(workerProps);
 
-
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
         EasyMock.expect(herder.plugins()).andStubReturn(plugins);
         EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
             workerConfig,