You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/05/16 21:29:43 UTC

[kafka] branch trunk updated: Add '?expand' query param for additional info on '/connectors'. (#6658)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5a95c2e  Add '?expand' query param for additional info on '/connectors'. (#6658)
5a95c2e is described below

commit 5a95c2e1cd555d5f3ec148cc7c765d1bb7d716f9
Author: dan norwood <no...@confluent.io>
AuthorDate: Thu May 16 14:29:30 2019 -0700

    Add '?expand' query param for additional info on '/connectors'. (#6658)
    
    Per KIP-465, kept existing behavior of `/connectors` resource in the Connect's REST API, but added the ability to specify `?expand` query parameter to get list of connectors with status details on each connector. Added unit tests, and verified passing existing system tests (which use the older list form).
    
    See https://cwiki.apache.org/confluence/display/KAFKA/KIP-465%3A+Add+Consolidated+Connector+Endpoint+to+Connect+REST+API.
    
    Author: Dan Norwood <no...@confluent.io>
    Reviewer: Randall Hauch <rh...@gmail.com>
---
 .../kafka/connect/runtime/AbstractHerder.java      |  21 ++++
 .../org/apache/kafka/connect/runtime/Herder.java   |  12 ++
 .../runtime/distributed/DistributedHerder.java     |   5 +-
 .../runtime/rest/resources/ConnectorsResource.java |  44 +++++++-
 .../runtime/standalone/StandaloneHerder.java       |   8 +-
 .../kafka/connect/runtime/AbstractHerderTest.java  |  50 +++++++++
 .../runtime/distributed/DistributedHerderTest.java |   4 +-
 .../kafka/connect/runtime/rest/RestServerTest.java |  17 +--
 .../rest/resources/ConnectorsResourceTest.java     | 122 +++++++++++++++++----
 9 files changed, 232 insertions(+), 51 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 82fdecc..8e7d016 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -209,6 +209,27 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     protected abstract Map<String, String> config(String connName);
 
     @Override
+    public Collection<String> connectors() {
+        return configBackingStore.snapshot().connectors();
+    }
+
+    @Override
+    public ConnectorInfo connectorInfo(String connector) {
+        final ClusterConfigState configState = configBackingStore.snapshot();
+
+        if (!configState.contains(connector))
+            return null;
+        Map<String, String> config = configState.rawConnectorConfig(connector);
+
+        return new ConnectorInfo(
+            connector,
+            config,
+            configState.tasks(connector),
+            connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))
+        );
+    }
+
+    @Override
     public ConnectorStateInfo connectorStatus(String connName) {
         ConnectorStatus connector = statusBackingStore.get(connName);
         if (connector == null)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index c572e20..550348f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -124,6 +124,18 @@ public interface Herder {
     void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback);
 
     /**
+     * Get a list of connectors currently running in this cluster.
+     * @returns A list of connector names
+     */
+    Collection<String> connectors();
+
+    /**
+     * Get the definition and status of a connector.
+     * @param connName name of the connector
+     */
+    ConnectorInfo connectorInfo(String connName);
+
+    /**
      * Lookup the current status of a connector.
      * @param connName name of the connector
      */
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 711b6c9..96d4bfc 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -451,10 +451,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                         if (!configState.contains(connName)) {
                             callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
                         } else {
-                            Map<String, String> config = configState.rawConnectorConfig(connName);
-                            callback.onCompletion(null, new ConnectorInfo(connName, config,
-                                configState.tasks(connName),
-                                connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))));
+                            callback.onCompletion(null, connectorInfo(connName));
                         }
                         return null;
                     }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 4a04512..61cf5da 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+
+import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -44,11 +46,14 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+
 import java.net.URI;
-import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -79,11 +84,38 @@ public class ConnectorsResource {
 
     @GET
     @Path("/")
-    public Collection<String> listConnectors(final @QueryParam("forward") Boolean forward) throws Throwable {
-        FutureCallback<Collection<String>> cb = new FutureCallback<>();
-        herder.connectors(cb);
-        return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() {
-        }, forward);
+    public Response listConnectors(
+        final @Context UriInfo uriInfo
+    ) throws Throwable {
+        if (uriInfo.getQueryParameters().containsKey("expand")) {
+            Map<String, Map<String, Object>> out = new HashMap<>();
+            for (String connector : herder.connectors()) {
+                try {
+                    Map<String, Object> connectorExpansions = new HashMap<>();
+                    for (String expansion : uriInfo.getQueryParameters().get("expand")) {
+                        switch (expansion) {
+                            case "status":
+                                connectorExpansions.put("status", herder.connectorStatus(connector));
+                                break;
+                            case "info":
+                                connectorExpansions.put("info", herder.connectorInfo(connector));
+                                break;
+                            default:
+                                log.info("Ignoring unknown expanion type {}", expansion);
+                        }
+                    }
+                    out.put(connector, connectorExpansions);
+                } catch (NotFoundException e) {
+                    // this likely means that a connector has been removed while we look its info up
+                    // we can just not include this connector in the return entity
+                    log.debug("Unable to get connector info for {} on this worker", connector);
+                }
+
+            }
+            return Response.ok(out).build();
+        } else {
+            return Response.ok(herder.connectors()).build();
+        }
     }
 
     @POST
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 172c9b2..7b6d16a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -103,7 +103,7 @@ public class StandaloneHerder extends AbstractHerder {
         // There's no coordination/hand-off to do here since this is all standalone. Instead, we
         // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
         // the tasks.
-        for (String connName : configState.connectors()) {
+        for (String connName : connectors()) {
             removeConnectorTasks(connName);
             worker.stopConnector(connName);
         }
@@ -118,12 +118,12 @@ public class StandaloneHerder extends AbstractHerder {
 
     @Override
     public synchronized void connectors(Callback<Collection<String>> callback) {
-        callback.onCompletion(null, configState.connectors());
+        callback.onCompletion(null, connectors());
     }
-
+    
     @Override
     public synchronized void connectorInfo(String connName, Callback<ConnectorInfo> callback) {
-        ConnectorInfo connectorInfo = createConnectorInfo(connName);
+        ConnectorInfo connectorInfo = connectorInfo(connName);
         if (connectorInfo == null) {
             callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
             return;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index fc557c6..f7ee8a6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -125,6 +125,56 @@ public class AbstractHerderTest {
     @MockStrict private StatusBackingStore statusStore;
 
     @Test
+    public void testConnectors() {
+        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+            .withConstructor(
+                Worker.class,
+                String.class,
+                String.class,
+                StatusBackingStore.class,
+                ConfigBackingStore.class
+            )
+            .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
+            .addMockedMethod("generation")
+            .createMock();
+
+        EasyMock.expect(herder.generation()).andStubReturn(generation);
+        EasyMock.expect(herder.config(connector)).andReturn(null);
+        EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT);
+        replayAll();
+        assertEquals(Collections.singleton(CONN1), new HashSet<>(herder.connectors()));
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConnectorStatus() {
+        ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
+        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+            .withConstructor(
+                Worker.class,
+                String.class,
+                String.class,
+                StatusBackingStore.class,
+                ConfigBackingStore.class
+            )
+            .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
+            .addMockedMethod("generation")
+            .createMock();
+
+        EasyMock.expect(herder.generation()).andStubReturn(generation);
+        EasyMock.expect(herder.config(connector)).andReturn(null);
+        EasyMock.expect(statusStore.get(connector))
+            .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
+        EasyMock.expect(statusStore.getAll(connector))
+            .andReturn(Collections.singletonList(
+                new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation)));
+
+        replayAll();
+        ConnectorStateInfo csi = herder.connectorStatus(connector);
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void connectorStatus() {
         ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index b8ec0f1..fcac1fb 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -1295,6 +1295,7 @@ public class DistributedHerderTest {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
         expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+        EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT).times(2);
 
         WorkerConfigTransformer configTransformer = EasyMock.mock(WorkerConfigTransformer.class);
         EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), EasyMock.anyObject()))
@@ -1376,6 +1377,7 @@ public class DistributedHerderTest {
         EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
         EasyMock.expect(connectorMock.validate(CONN1_CONFIG_UPDATED)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
         EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
+        EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT);
 
         configBackingStore.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@@ -1388,7 +1390,7 @@ public class DistributedHerderTest {
         });
         // As a result of reconfig, should need to update snapshot. With only connector updates, we'll just restart
         // connector without rebalance
-        EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
+        EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG).times(2);
         worker.stopConnector(CONN1);
         PowerMock.expectLastCall().andReturn(true);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 91aa5e7..3609fb3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -30,8 +30,6 @@ import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
-import org.apache.kafka.connect.util.Callback;
-import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Assert;
@@ -47,7 +45,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -212,12 +209,7 @@ public class RestServerTest {
                                            ConnectRestExtension.class))
             .andStubReturn(Collections.emptyList());
 
-        final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture();
-        herder.connectors(EasyMock.capture(connectorsCallback));
-        PowerMock.expectLastCall().andAnswer(() -> {
-            connectorsCallback.getValue().onCompletion(null, Arrays.asList("a", "b"));
-            return null;
-        });
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b"));
 
         PowerMock.replayAll();
 
@@ -270,12 +262,7 @@ public class RestServerTest {
             workerConfig,
             ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
 
-        final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture();
-        herder.connectors(EasyMock.capture(connectorsCallback));
-        PowerMock.expectLastCall().andAnswer(() -> {
-            connectorsCallback.getValue().onCompletion(null, Arrays.asList("a", "b"));
-            return null;
-        });
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b"));
 
         PowerMock.replayAll();
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index f84cd25..5dc7f1e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.rest.resources;
 import com.fasterxml.jackson.core.type.TypeReference;
 
 import org.apache.kafka.connect.errors.AlreadyExistsException;
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
@@ -28,6 +27,7 @@ import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
 import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
 import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
@@ -47,6 +47,10 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import javax.ws.rs.BadRequestException;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.UriInfo;
+
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -121,12 +125,18 @@ public class ConnectorsResourceTest {
     @Mock
     private Herder herder;
     private ConnectorsResource connectorsResource;
+    private UriInfo forward;
 
     @Before
     public void setUp() throws NoSuchMethodException {
         PowerMock.mockStatic(RestClient.class,
                 RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class, WorkerConfig.class));
         connectorsResource = new ConnectorsResource(herder, null);
+        forward = EasyMock.mock(UriInfo.class);
+        MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
+        queryParams.putSingle("forward", "true");
+        EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes();
+        EasyMock.replay(forward);
     }
 
     private static final Map<String, String> getConnectorConfig(Map<String, String> mapToClone) {
@@ -137,12 +147,11 @@ public class ConnectorsResourceTest {
     @Test
     public void testListConnectors() throws Throwable {
         final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
-        herder.connectors(EasyMock.capture(cb));
-        expectAndCallbackResult(cb, Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME));
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME));
 
         PowerMock.replayAll();
 
-        Collection<String> connectors = connectorsResource.listConnectors(FORWARD);
+        Collection<String> connectors = (Collection<String>) connectorsResource.listConnectors(forward).getEntity();
         // Ordering isn't guaranteed, compare sets
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors));
 
@@ -150,36 +159,107 @@ public class ConnectorsResourceTest {
     }
 
     @Test
-    public void testListConnectorsNotLeader() throws Throwable {
-        final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
-        herder.connectors(EasyMock.capture(cb));
-        expectAndCallbackNotLeaderException(cb);
-        // Should forward request
-        EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("GET"),
-                EasyMock.isNull(), EasyMock.anyObject(TypeReference.class), EasyMock.anyObject(WorkerConfig.class)))
-                .andReturn(new RestClient.HttpResponse<>(200, new HashMap<String, String>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)));
+    public void testExpandConnectorsStatus() throws Throwable {
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME));
+        ConnectorStateInfo connector = EasyMock.mock(ConnectorStateInfo.class);
+        ConnectorStateInfo connector2 = EasyMock.mock(ConnectorStateInfo.class);
+        EasyMock.expect(herder.connectorStatus(CONNECTOR2_NAME)).andReturn(connector2);
+        EasyMock.expect(herder.connectorStatus(CONNECTOR_NAME)).andReturn(connector);
+
+        forward = EasyMock.mock(UriInfo.class);
+        MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
+        queryParams.putSingle("expand", "status");
+        EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes();
+        EasyMock.replay(forward);
 
         PowerMock.replayAll();
 
-        Collection<String> connectors = connectorsResource.listConnectors(FORWARD);
+        Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity();
         // Ordering isn't guaranteed, compare sets
-        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors));
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), expanded.keySet());
+        assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status"));
+        assertEquals(connector, expanded.get(CONNECTOR_NAME).get("status"));
+        PowerMock.verifyAll();
+    }
 
+    @Test
+    public void testExpandConnectorsInfo() throws Throwable {
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME));
+        ConnectorInfo connector = EasyMock.mock(ConnectorInfo.class);
+        ConnectorInfo connector2 = EasyMock.mock(ConnectorInfo.class);
+        EasyMock.expect(herder.connectorInfo(CONNECTOR2_NAME)).andReturn(connector2);
+        EasyMock.expect(herder.connectorInfo(CONNECTOR_NAME)).andReturn(connector);
+
+        forward = EasyMock.mock(UriInfo.class);
+        MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
+        queryParams.putSingle("expand", "info");
+        EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes();
+        EasyMock.replay(forward);
+
+        PowerMock.replayAll();
+
+        Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity();
+        // Ordering isn't guaranteed, compare sets
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), expanded.keySet());
+        assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("info"));
+        assertEquals(connector, expanded.get(CONNECTOR_NAME).get("info"));
         PowerMock.verifyAll();
     }
 
-    @Test(expected = ConnectException.class)
-    public void testListConnectorsNotSynced() throws Throwable {
-        final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
-        herder.connectors(EasyMock.capture(cb));
-        expectAndCallbackException(cb, new ConnectException("not synced"));
+    @Test
+    public void testFullExpandConnectors() throws Throwable {
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME));
+        ConnectorInfo connectorInfo = EasyMock.mock(ConnectorInfo.class);
+        ConnectorInfo connectorInfo2 = EasyMock.mock(ConnectorInfo.class);
+        EasyMock.expect(herder.connectorInfo(CONNECTOR2_NAME)).andReturn(connectorInfo2);
+        EasyMock.expect(herder.connectorInfo(CONNECTOR_NAME)).andReturn(connectorInfo);
+        ConnectorStateInfo connector = EasyMock.mock(ConnectorStateInfo.class);
+        ConnectorStateInfo connector2 = EasyMock.mock(ConnectorStateInfo.class);
+        EasyMock.expect(herder.connectorStatus(CONNECTOR2_NAME)).andReturn(connector2);
+        EasyMock.expect(herder.connectorStatus(CONNECTOR_NAME)).andReturn(connector);
+
+        forward = EasyMock.mock(UriInfo.class);
+        MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
+        queryParams.put("expand", Arrays.asList("info", "status"));
+        EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes();
+        EasyMock.replay(forward);
+
+        PowerMock.replayAll();
+
+        Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity();
+        // Ordering isn't guaranteed, compare sets
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), expanded.keySet());
+        assertEquals(connectorInfo2, expanded.get(CONNECTOR2_NAME).get("info"));
+        assertEquals(connectorInfo, expanded.get(CONNECTOR_NAME).get("info"));
+        assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status"));
+        assertEquals(connector, expanded.get(CONNECTOR_NAME).get("status"));
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testExpandConnectorsWithConnectorNotFound() throws Throwable {
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME));
+        ConnectorStateInfo connector = EasyMock.mock(ConnectorStateInfo.class);
+        ConnectorStateInfo connector2 = EasyMock.mock(ConnectorStateInfo.class);
+        EasyMock.expect(herder.connectorStatus(CONNECTOR2_NAME)).andReturn(connector2);
+        EasyMock.expect(herder.connectorStatus(CONNECTOR_NAME)).andThrow(EasyMock.mock(NotFoundException.class));
+
+        forward = EasyMock.mock(UriInfo.class);
+        MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
+        queryParams.putSingle("expand", "status");
+        EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes();
+        EasyMock.replay(forward);
 
         PowerMock.replayAll();
 
-        // throws
-        connectorsResource.listConnectors(FORWARD);
+        Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity();
+        // Ordering isn't guaranteed, compare sets
+        assertEquals(Collections.singleton(CONNECTOR2_NAME), expanded.keySet());
+        assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status"));
+        PowerMock.verifyAll();
     }
 
+
     @Test
     public void testCreateConnector() throws Throwable {
         CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));