You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/04/23 22:27:34 UTC

[kafka] branch 2.4 updated: KAFKA-9883: Add better error message when REST API forwards a request and leader is not known (#8536)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new c6954b7  KAFKA-9883: Add better error message when REST API forwards a request and leader is not known (#8536)
c6954b7 is described below

commit c6954b79cadaafe3d439ed006c352ec386c623b4
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Thu Apr 23 13:37:45 2020 -0500

    KAFKA-9883: Add better error message when REST API forwards a request and leader is not known (#8536)
    
    When the Connect worker forwards a REST API request to the leader, it might get back a `RequestTargetException` that suggests the worker should forward the request to a different worker. This can happen when the leader changes, and the worker that receives the original request forwards the request to the worker that it thinks is the current leader, but that worker is not the current leader. In this case. In most cases, the worker that received the forwarded request includes the URL of [...]
    
    When this rare case happens, the user gets a null pointer exception in their response and the NPE is logged. Instead, the worker should catch this condition and provide a more useful error message that is similar to other existing error messages that might occur.
    
    Added a unit test that verifies this corner case is caught and this particular NPE does not occur.
    
    Author: Randall Hauch <rh...@gmail.com>
    Reviewer: Konstantine Karantasis <ko...@confluent.io>
---
 .../runtime/rest/resources/ConnectorsResource.java    |  9 ++++++++-
 .../rest/resources/ConnectorsResourceTest.java        | 19 +++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 3b41fb7..8728e1c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -306,7 +306,14 @@ public class ConnectorsResource {
                     // this gives two total hops to resolve the request before giving up.
                     boolean recursiveForward = forward == null;
                     RequestTargetException targetException = (RequestTargetException) cause;
-                    String forwardUrl = UriBuilder.fromUri(targetException.forwardUrl())
+                    String forwardedUrl = targetException.forwardUrl();
+                    if (forwardedUrl == null) {
+                        // the target didn't know of the leader at this moment.
+                        throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
+                                "Cannot complete request momentarily due to no known leader URL, "
+                                + "likely because a rebalance was underway.");
+                    }
+                    String forwardUrl = UriBuilder.fromUri(forwardedUrl)
                             .path(path)
                             .queryParam("forward", recursiveForward)
                             .build()
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 967b8fc..f604c80 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
 import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.easymock.Capture;
@@ -69,6 +70,8 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(RestClient.class)
@@ -803,6 +806,22 @@ public class ConnectorsResourceTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testCompleteOrForwardWithErrorAndNoForwardUrl() throws Throwable {
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.deleteConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+        String leaderUrl = null;
+        expectAndCallbackException(cb, new NotLeaderException("not leader", leaderUrl));
+
+        PowerMock.replayAll();
+
+        ConnectRestException e = assertThrows(ConnectRestException.class, () -> {
+            connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
+        });
+        assertTrue(e.getMessage().contains("no known leader URL"));
+        PowerMock.verifyAll();
+    }
+
     private <T> byte[] serializeAsBytes(final T value) throws IOException {
         return new ObjectMapper().writeValueAsBytes(value);
     }