You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/05/10 01:28:13 UTC
[kafka] branch trunk updated: KAFKA-8231: Expansion of
ConnectClusterState interface (#6584)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 36a5aba KAFKA-8231: Expansion of ConnectClusterState interface (#6584)
36a5aba is described below
commit 36a5aba4ecf56631372ad12d5c67af8fa3de05f8
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Thu May 9 18:27:59 2019 -0700
KAFKA-8231: Expansion of ConnectClusterState interface (#6584)
Expand ConnectClusterState interface and implementation with methods that provide the immutable cluster details and the connector configuration. This includes unit tests for the new methods.
Author: Chris Egerton <ce...@oberlin.edu>
Reviews: Arjun Satish <ar...@confluent.io>, Konstantine Karantasis <ko...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
.../connect/health/ConnectClusterDetails.java | 32 ++++++++++++++++++++
.../kafka/connect/health/ConnectClusterState.java | 30 +++++++++++++++++--
.../runtime/health/ConnectClusterDetailsImpl.java | 34 +++++++++++++++++++++
.../runtime/health/ConnectClusterStateImpl.java | 29 ++++++++++++++++--
.../kafka/connect/runtime/rest/RestServer.java | 8 ++++-
.../health/ConnectClusterStateImplTest.java | 35 ++++++++++++++++++++--
.../kafka/connect/runtime/rest/RestServerTest.java | 6 +++-
7 files changed, 165 insertions(+), 9 deletions(-)
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterDetails.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterDetails.java
new file mode 100644
index 0000000..edde6ff
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterDetails.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.health;
+
+/**
+ * Provides immutable Connect cluster information, such as the ID of the backing Kafka cluster. The
+ * Connect framework provides the implementation for this interface.
+ */
+public interface ConnectClusterDetails {
+
+ /**
+ * Get the cluster ID of the Kafka cluster backing this Connect cluster.
+ *
+ * @return the cluster ID of the Kafka cluster backing this Connect cluster
+ **/
+ String kafkaClusterId();
+}
\ No newline at end of file
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java
index d4292ef..753ee1a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java
@@ -18,10 +18,13 @@
package org.apache.kafka.connect.health;
import java.util.Collection;
+import java.util.Map;
/**
- * Provides the ability to lookup connector metadata and its health. This is made available to the {@link org.apache.kafka.connect.rest.ConnectRestExtension}
- * implementations. The Connect framework provides the implementation for this interface.
+ * Provides the ability to lookup connector metadata, including status and configurations, as well
+ * as immutable cluster information such as Kafka cluster ID. This is made available to
+ * {@link org.apache.kafka.connect.rest.ConnectRestExtension} implementations. The Connect framework
+ * provides the implementation for this interface.
*/
public interface ConnectClusterState {
@@ -43,4 +46,27 @@ public interface ConnectClusterState {
* @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found
*/
ConnectorHealth connectorHealth(String connName);
+
+ /**
+ * Lookup the current configuration of a connector. This provides the current snapshot of configuration by querying the underlying
+ * herder. A connector returned by previous invocation of {@link #connectors()} may no longer be available and could result in {@link
+ * org.apache.kafka.connect.errors.NotFoundException}.
+ *
+ * @param connName name of the connector
+ * @return the configuration of the connector for the connector name
+ * @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found
+ * @throws java.lang.UnsupportedOperationException if the default implementation has not been overridden
+ */
+ default Map<String, String> connectorConfig(String connName) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Get details about the setup of the Connect cluster.
+ * @return a {@link ConnectClusterDetails} object containing information about the cluster
+ * @throws java.lang.UnsupportedOperationException if the default implementation has not been overridden
+ **/
+ default ConnectClusterDetails clusterDetails() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterDetailsImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterDetailsImpl.java
new file mode 100644
index 0000000..09f09bd
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterDetailsImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.health;
+
+import org.apache.kafka.connect.health.ConnectClusterDetails;
+
+public class ConnectClusterDetailsImpl implements ConnectClusterDetails {
+
+ private final String kafkaClusterId;
+
+ public ConnectClusterDetailsImpl(String kafkaClusterId) {
+ this.kafkaClusterId = kafkaClusterId;
+ }
+
+ @Override
+ public String kafkaClusterId() {
+ return kafkaClusterId;
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
index e3a4833..c3e950e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -18,6 +18,7 @@
package org.apache.kafka.connect.runtime.health;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.health.ConnectClusterDetails;
import org.apache.kafka.connect.health.ConnectClusterState;
import org.apache.kafka.connect.health.ConnectorHealth;
import org.apache.kafka.connect.health.ConnectorState;
@@ -38,10 +39,16 @@ import java.util.concurrent.TimeoutException;
public class ConnectClusterStateImpl implements ConnectClusterState {
private final long herderRequestTimeoutMs;
+ private final ConnectClusterDetails clusterDetails;
private final Herder herder;
- public ConnectClusterStateImpl(long connectorsTimeoutMs, Herder herder) {
+ public ConnectClusterStateImpl(
+ long connectorsTimeoutMs,
+ ConnectClusterDetails clusterDetails,
+ Herder herder
+ ) {
this.herderRequestTimeoutMs = connectorsTimeoutMs;
+ this.clusterDetails = clusterDetails;
this.herder = herder;
}
@@ -58,7 +65,6 @@ public class ConnectClusterStateImpl implements ConnectClusterState {
@Override
public ConnectorHealth connectorHealth(String connName) {
-
ConnectorStateInfo state = herder.connectorStatus(connName);
ConnectorState connectorState = new ConnectorState(
state.connector().state(),
@@ -75,6 +81,25 @@ public class ConnectClusterStateImpl implements ConnectClusterState {
return connectorHealth;
}
+ @Override
+ public Map<String, String> connectorConfig(String connName) {
+ FutureCallback<Map<String, String>> connectorConfigCallback = new FutureCallback<>();
+ herder.connectorConfig(connName, connectorConfigCallback);
+ try {
+ return connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new ConnectException(
+ String.format("Failed to retrieve configuration for connector '%s'", connName),
+ e
+ );
+ }
+ }
+
+ @Override
+ public ConnectClusterDetails clusterDetails() {
+ return clusterDetails;
+ }
+
private Map<Integer, TaskState> taskStates(List<ConnectorStateInfo.TaskState> states) {
Map<Integer, TaskState> taskStates = new HashMap<>();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 067fd88..d76cfff 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -19,10 +19,12 @@ package org.apache.kafka.connect.runtime.rest;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.health.ConnectClusterDetails;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.health.ConnectClusterDetailsImpl;
import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
@@ -328,10 +330,14 @@ public class RestServer {
herderRequestTimeoutMs = Math.min(herderRequestTimeoutMs, rebalanceTimeoutMs.longValue());
}
+ ConnectClusterDetails connectClusterDetails = new ConnectClusterDetailsImpl(
+ herder.kafkaClusterId()
+ );
+
ConnectRestExtensionContext connectRestExtensionContext =
new ConnectRestExtensionContextImpl(
new ConnectRestConfigurable(resourceConfig),
- new ConnectClusterStateImpl(herderRequestTimeoutMs, herder)
+ new ConnectClusterStateImpl(herderRequestTimeoutMs, connectClusterDetails, herder)
);
for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
connectRestExtension.register(connectRestExtensionContext);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
index 4752075..d8984f0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
@@ -30,6 +30,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -38,17 +40,22 @@ import static org.junit.Assert.assertThrows;
@RunWith(PowerMockRunner.class)
public class ConnectClusterStateImplTest {
-
+ protected static final String KAFKA_CLUSTER_ID = "franzwashere";
+
@Mock
protected Herder herder;
protected ConnectClusterStateImpl connectClusterState;
- protected Collection<String> expectedConnectors;
protected long herderRequestTimeoutMs = TimeUnit.SECONDS.toMillis(10);
+ protected Collection<String> expectedConnectors;
@Before
public void setUp() {
- connectClusterState = new ConnectClusterStateImpl(herderRequestTimeoutMs, herder);
expectedConnectors = Arrays.asList("sink1", "source1", "source2");
+ connectClusterState = new ConnectClusterStateImpl(
+ herderRequestTimeoutMs,
+ new ConnectClusterDetailsImpl(KAFKA_CLUSTER_ID),
+ herder
+ );
}
@Test
@@ -67,6 +74,28 @@ public class ConnectClusterStateImplTest {
}
@Test
+ public void connectorConfig() {
+ final String connName = "sink6";
+ final Map<String, String> expectedConfig = Collections.singletonMap("key", "value");
+ Capture<Callback<Map<String, String>>> callback = EasyMock.newCapture();
+ herder.connectorConfig(EasyMock.eq(connName), EasyMock.capture(callback));
+ EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() {
+ callback.getValue().onCompletion(null, expectedConfig);
+ return null;
+ }
+ });
+ EasyMock.replay(herder);
+ assertEquals(expectedConfig, connectClusterState.connectorConfig(connName));
+ }
+
+ @Test
+ public void kafkaClusterId() {
+ assertEquals(KAFKA_CLUSTER_ID, connectClusterState.clusterDetails().kafkaClusterId());
+ }
+
+ @Test
public void connectorsFailure() {
Capture<Callback<Collection<String>>> callback = EasyMock.newCapture();
herder.connectors(EasyMock.capture(callback));
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 3d297b7..91aa5e7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -62,6 +62,8 @@ public class RestServerTest {
private Plugins plugins;
private RestServer server;
+ protected static final String KAFKA_CLUSTER_ID = "Xbafgnagvar";
+
@After
public void tearDown() {
server.stop();
@@ -166,6 +168,7 @@ public class RestServerTest {
Map<String, String> configMap = new HashMap<>(baseWorkerProps());
DistributedConfig workerConfig = new DistributedConfig(configMap);
+ EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
EasyMock.expect(herder.plugins()).andStubReturn(plugins);
EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
workerConfig,
@@ -202,6 +205,7 @@ public class RestServerTest {
workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
WorkerConfig workerConfig = new DistributedConfig(workerProps);
+ EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
EasyMock.expect(herder.plugins()).andStubReturn(plugins);
EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
workerConfig,
@@ -260,7 +264,7 @@ public class RestServerTest {
workerProps.put("offset.storage.file.filename", "/tmp");
WorkerConfig workerConfig = new StandaloneConfig(workerProps);
-
+ EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
EasyMock.expect(herder.plugins()).andStubReturn(plugins);
EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
workerConfig,