You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/04/07 14:43:27 UTC
[kafka] branch trunk updated: KAFKA-8058: Fix
ConnectClusterStateImpl.connectors() method (#6384)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 71e721f KAFKA-8058: Fix ConnectClusterStateImpl.connectors() method (#6384)
71e721f is described below
commit 71e721f1354739b183fa7c58a3483098c6832928
Author: Chris Egerton <fe...@gmail.com>
AuthorDate: Sun Apr 7 07:43:09 2019 -0700
KAFKA-8058: Fix ConnectClusterStateImpl.connectors() method (#6384)
Fixed the ConnectClusterStateImpl.connectors() method and throw an exception on timeout. Added unit test.
Author: Chris Egerton <ch...@confluent.io>
Reviewers: Magesh Nandakumar <ma...@gmail.com>, Robert Yokota <ra...@gmail.com>, Arjun Satish <wi...@users.noreply.github.com>, Konstantine Karantasis <ko...@confluent.io>, Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #6384 from C0urante:kafka-8058
---
.../runtime/health/ConnectClusterStateImpl.java | 30 ++++----
.../kafka/connect/runtime/rest/RestServer.java | 9 ++-
.../runtime/rest/resources/ConnectorsResource.java | 2 +-
.../health/ConnectClusterStateImplTest.java | 87 ++++++++++++++++++++++
4 files changed, 113 insertions(+), 15 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
index ea93a72..32f7add 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.health;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.health.ConnectClusterState;
import org.apache.kafka.connect.health.ConnectorHealth;
import org.apache.kafka.connect.health.ConnectorState;
@@ -24,32 +25,35 @@ import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.health.TaskState;
import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
-import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.FutureCallback;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
public class ConnectClusterStateImpl implements ConnectClusterState {
+
+ private final long herderRequestTimeoutMs;
+ private final HerderProvider herderProvider;
- private HerderProvider herderProvider;
-
- public ConnectClusterStateImpl(HerderProvider herderProvider) {
+ public ConnectClusterStateImpl(long connectorsTimeoutMs, HerderProvider herderProvider) {
+ this.herderRequestTimeoutMs = connectorsTimeoutMs;
this.herderProvider = herderProvider;
}
@Override
public Collection<String> connectors() {
- final Collection<String> connectors = new ArrayList<>();
- herderProvider.get().connectors(new Callback<java.util.Collection<String>>() {
- @Override
- public void onCompletion(Throwable error, Collection<String> result) {
- connectors.addAll(result);
- }
- });
- return connectors;
+ FutureCallback<Collection<String>> connectorsCallback = new FutureCallback<>();
+ herderProvider.get().connectors(connectorsCallback);
+ try {
+ return connectorsCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new ConnectException("Failed to retrieve list of connectors", e);
+ }
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 5cc31cd..b592d9a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -23,6 +23,7 @@ import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
@@ -308,10 +309,16 @@ public class RestServer {
config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
config, ConnectRestExtension.class);
+ long herderRequestTimeoutMs = ConnectorsResource.REQUEST_TIMEOUT_MS;
+ Integer rebalanceTimeoutMs = config.getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG);
+ if (rebalanceTimeoutMs != null) {
+ herderRequestTimeoutMs = Math.min(herderRequestTimeoutMs, rebalanceTimeoutMs.longValue());
+ }
+
ConnectRestExtensionContext connectRestExtensionContext =
new ConnectRestExtensionContextImpl(
new ConnectRestConfigurable(resourceConfig),
- new ConnectClusterStateImpl(provider)
+ new ConnectClusterStateImpl(herderRequestTimeoutMs, provider)
);
for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
connectRestExtension.register(connectRestExtensionContext);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 29a8c39..d4d84f1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -66,7 +66,7 @@ public class ConnectorsResource {
// session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
// we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
// but currently a worker simply leaving the group can take this long as well.
- private static final long REQUEST_TIMEOUT_MS = 90 * 1000;
+ public static final long REQUEST_TIMEOUT_MS = 90 * 1000;
private final HerderProvider herderProvider;
private final WorkerConfig config;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
new file mode 100644
index 0000000..e4faea4
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.health;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
+import org.apache.kafka.connect.util.Callback;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+public class ConnectClusterStateImplTest {
+
+ @Mock
+ protected Herder herder;
+ protected HerderProvider herderProvider;
+ protected ConnectClusterStateImpl connectClusterState;
+ protected Collection<String> expectedConnectors;
+ protected long herderRequestTimeoutMs = TimeUnit.SECONDS.toMillis(10);
+
+ @Before
+ public void setUp() {
+ herderProvider = new HerderProvider(herder);
+ connectClusterState = new ConnectClusterStateImpl(herderRequestTimeoutMs, herderProvider);
+ expectedConnectors = Arrays.asList("sink1", "source1", "source2");
+ }
+
+ @Test
+ public void connectors() {
+ Capture<Callback<Collection<String>>> callback = EasyMock.newCapture();
+ herder.connectors(EasyMock.capture(callback));
+ EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() {
+ callback.getValue().onCompletion(null, expectedConnectors);
+ return null;
+ }
+ });
+ EasyMock.replay(herder);
+ assertEquals(expectedConnectors, connectClusterState.connectors());
+ }
+
+ @Test
+ public void connectorsFailure() {
+ Capture<Callback<Collection<String>>> callback = EasyMock.newCapture();
+ herder.connectors(EasyMock.capture(callback));
+ EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() {
+ Throwable timeout = new TimeoutException();
+ callback.getValue().onCompletion(timeout, null);
+ return null;
+ }
+ });
+ EasyMock.replay(herder);
+ assertThrows(ConnectException.class, connectClusterState::connectors);
+ }
+}