You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/04/07 14:43:27 UTC

[kafka] branch trunk updated: KAFKA-8058: Fix ConnectClusterStateImpl.connectors() method (#6384)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 71e721f  KAFKA-8058: Fix ConnectClusterStateImpl.connectors() method (#6384)
71e721f is described below

commit 71e721f1354739b183fa7c58a3483098c6832928
Author: Chris Egerton <fe...@gmail.com>
AuthorDate: Sun Apr 7 07:43:09 2019 -0700

    KAFKA-8058: Fix ConnectClusterStateImpl.connectors() method (#6384)
    
    Fixed the ConnectClusterStateImpl.connectors() method and throw an exception on timeout. Added unit test.
    
    Author: Chris Egerton <ch...@confluent.io>
    
    Reviewers: Magesh Nandakumar <ma...@gmail.com>, Robert Yokota <ra...@gmail.com>, Arjun Satish <wi...@users.noreply.github.com>, Konstantine Karantasis <ko...@confluent.io>, Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #6384 from C0urante:kafka-8058
---
 .../runtime/health/ConnectClusterStateImpl.java    | 30 ++++----
 .../kafka/connect/runtime/rest/RestServer.java     |  9 ++-
 .../runtime/rest/resources/ConnectorsResource.java |  2 +-
 .../health/ConnectClusterStateImplTest.java        | 87 ++++++++++++++++++++++
 4 files changed, 113 insertions(+), 15 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
index ea93a72..32f7add 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.connect.runtime.health;
 
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.health.ConnectClusterState;
 import org.apache.kafka.connect.health.ConnectorHealth;
 import org.apache.kafka.connect.health.ConnectorState;
@@ -24,32 +25,35 @@ import org.apache.kafka.connect.health.ConnectorType;
 import org.apache.kafka.connect.health.TaskState;
 import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
-import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.FutureCallback;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 public class ConnectClusterStateImpl implements ConnectClusterState {
+    
+    private final long herderRequestTimeoutMs;
+    private final HerderProvider herderProvider;
 
-    private HerderProvider herderProvider;
-
-    public ConnectClusterStateImpl(HerderProvider herderProvider) {
+    public ConnectClusterStateImpl(long connectorsTimeoutMs, HerderProvider herderProvider) {
+        this.herderRequestTimeoutMs = connectorsTimeoutMs;
         this.herderProvider = herderProvider;
     }
 
     @Override
     public Collection<String> connectors() {
-        final Collection<String> connectors = new ArrayList<>();
-        herderProvider.get().connectors(new Callback<java.util.Collection<String>>() {
-            @Override
-            public void onCompletion(Throwable error, Collection<String> result) {
-                connectors.addAll(result);
-            }
-        });
-        return connectors;
+        FutureCallback<Collection<String>> connectorsCallback = new FutureCallback<>();
+        herderProvider.get().connectors(connectorsCallback);
+        try {
+            return connectorsCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            throw new ConnectException("Failed to retrieve list of connectors", e);
+        }
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 5cc31cd..b592d9a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -23,6 +23,7 @@ import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
 import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
@@ -308,10 +309,16 @@ public class RestServer {
             config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
             config, ConnectRestExtension.class);
 
+        long herderRequestTimeoutMs = ConnectorsResource.REQUEST_TIMEOUT_MS;
+        Integer rebalanceTimeoutMs = config.getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG);
+        if (rebalanceTimeoutMs != null) {
+            herderRequestTimeoutMs = Math.min(herderRequestTimeoutMs, rebalanceTimeoutMs.longValue());
+        }
+
         ConnectRestExtensionContext connectRestExtensionContext =
             new ConnectRestExtensionContextImpl(
                 new ConnectRestConfigurable(resourceConfig),
-                new ConnectClusterStateImpl(provider)
+                new ConnectClusterStateImpl(herderRequestTimeoutMs, provider)
             );
         for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
             connectRestExtension.register(connectRestExtensionContext);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 29a8c39..d4d84f1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -66,7 +66,7 @@ public class ConnectorsResource {
     // session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
     // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
     // but currently a worker simply leaving the group can take this long as well.
-    private static final long REQUEST_TIMEOUT_MS = 90 * 1000;
+    public static final long REQUEST_TIMEOUT_MS = 90 * 1000;
 
     private final HerderProvider herderProvider;
     private final WorkerConfig config;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
new file mode 100644
index 0000000..e4faea4
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.health;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
+import org.apache.kafka.connect.util.Callback;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+public class ConnectClusterStateImplTest {
+  
+    @Mock
+    protected Herder herder;
+    protected HerderProvider herderProvider;
+    protected ConnectClusterStateImpl connectClusterState;
+    protected Collection<String> expectedConnectors;
+    protected long herderRequestTimeoutMs = TimeUnit.SECONDS.toMillis(10);
+    
+    @Before
+    public void setUp() {
+        herderProvider = new HerderProvider(herder);
+        connectClusterState = new ConnectClusterStateImpl(herderRequestTimeoutMs, herderProvider);
+        expectedConnectors = Arrays.asList("sink1", "source1", "source2");
+    }
+    
+    @Test
+    public void connectors() {
+        Capture<Callback<Collection<String>>> callback = EasyMock.newCapture();
+        herder.connectors(EasyMock.capture(callback));
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() {
+                callback.getValue().onCompletion(null, expectedConnectors);
+                return null;
+            }
+        });
+        EasyMock.replay(herder);
+        assertEquals(expectedConnectors, connectClusterState.connectors());
+    }
+
+    @Test
+    public void connectorsFailure() {
+        Capture<Callback<Collection<String>>> callback = EasyMock.newCapture();
+        herder.connectors(EasyMock.capture(callback));
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() {
+                Throwable timeout = new TimeoutException();
+                callback.getValue().onCompletion(timeout, null);
+                return null;
+            }
+        });
+        EasyMock.replay(herder);
+        assertThrows(ConnectException.class, connectClusterState::connectors);
+    }
+}