You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/05/16 21:29:43 UTC
[kafka] branch trunk updated: Add '?expand' query param for
additional info on '/connectors'. (#6658)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5a95c2e Add '?expand' query param for additional info on '/connectors'. (#6658)
5a95c2e is described below
commit 5a95c2e1cd555d5f3ec148cc7c765d1bb7d716f9
Author: dan norwood <no...@confluent.io>
AuthorDate: Thu May 16 14:29:30 2019 -0700
Add '?expand' query param for additional info on '/connectors'. (#6658)
Per KIP-465, kept existing behavior of `/connectors` resource in the Connect's REST API, but added the ability to specify `?expand` query parameter to get list of connectors with status details on each connector. Added unit tests, and verified passing existing system tests (which use the older list form).
See https://cwiki.apache.org/confluence/display/KAFKA/KIP-465%3A+Add+Consolidated+Connector+Endpoint+to+Connect+REST+API.
Author: Dan Norwood <no...@confluent.io>
Reviewer: Randall Hauch <rh...@gmail.com>
---
.../kafka/connect/runtime/AbstractHerder.java | 21 ++++
.../org/apache/kafka/connect/runtime/Herder.java | 12 ++
.../runtime/distributed/DistributedHerder.java | 5 +-
.../runtime/rest/resources/ConnectorsResource.java | 44 +++++++-
.../runtime/standalone/StandaloneHerder.java | 8 +-
.../kafka/connect/runtime/AbstractHerderTest.java | 50 +++++++++
.../runtime/distributed/DistributedHerderTest.java | 4 +-
.../kafka/connect/runtime/rest/RestServerTest.java | 17 +--
.../rest/resources/ConnectorsResourceTest.java | 122 +++++++++++++++++----
9 files changed, 232 insertions(+), 51 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 82fdecc..8e7d016 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -209,6 +209,27 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
protected abstract Map<String, String> config(String connName);
@Override
+ public Collection<String> connectors() {
+ return configBackingStore.snapshot().connectors();
+ }
+
+ @Override
+ public ConnectorInfo connectorInfo(String connector) {
+ final ClusterConfigState configState = configBackingStore.snapshot();
+
+ if (!configState.contains(connector))
+ return null;
+ Map<String, String> config = configState.rawConnectorConfig(connector);
+
+ return new ConnectorInfo(
+ connector,
+ config,
+ configState.tasks(connector),
+ connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))
+ );
+ }
+
+ @Override
public ConnectorStateInfo connectorStatus(String connName) {
ConnectorStatus connector = statusBackingStore.get(connName);
if (connector == null)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index c572e20..550348f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -124,6 +124,18 @@ public interface Herder {
void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback);
/**
+ * Get a list of connectors currently running in this cluster.
+ * @returns A list of connector names
+ */
+ Collection<String> connectors();
+
+ /**
+ * Get the definition and status of a connector.
+ * @param connName name of the connector
+ */
+ ConnectorInfo connectorInfo(String connName);
+
+ /**
* Lookup the current status of a connector.
* @param connName name of the connector
*/
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 711b6c9..96d4bfc 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -451,10 +451,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
if (!configState.contains(connName)) {
callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
} else {
- Map<String, String> config = configState.rawConnectorConfig(connName);
- callback.onCompletion(null, new ConnectorInfo(connName, config,
- configState.tasks(connName),
- connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))));
+ callback.onCompletion(null, connectorInfo(connName));
}
return null;
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 4a04512..61cf5da 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -17,6 +17,8 @@
package org.apache.kafka.connect.runtime.rest.resources;
import com.fasterxml.jackson.core.type.TypeReference;
+
+import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -44,11 +46,14 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+
import java.net.URI;
-import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -79,11 +84,38 @@ public class ConnectorsResource {
@GET
@Path("/")
- public Collection<String> listConnectors(final @QueryParam("forward") Boolean forward) throws Throwable {
- FutureCallback<Collection<String>> cb = new FutureCallback<>();
- herder.connectors(cb);
- return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() {
- }, forward);
+ public Response listConnectors(
+ final @Context UriInfo uriInfo
+ ) throws Throwable {
+ if (uriInfo.getQueryParameters().containsKey("expand")) {
+ Map<String, Map<String, Object>> out = new HashMap<>();
+ for (String connector : herder.connectors()) {
+ try {
+ Map<String, Object> connectorExpansions = new HashMap<>();
+ for (String expansion : uriInfo.getQueryParameters().get("expand")) {
+ switch (expansion) {
+ case "status":
+ connectorExpansions.put("status", herder.connectorStatus(connector));
+ break;
+ case "info":
+ connectorExpansions.put("info", herder.connectorInfo(connector));
+ break;
+ default:
+ log.info("Ignoring unknown expanion type {}", expansion);
+ }
+ }
+ out.put(connector, connectorExpansions);
+ } catch (NotFoundException e) {
+ // this likely means that a connector has been removed while we look its info up
+ // we can just not include this connector in the return entity
+ log.debug("Unable to get connector info for {} on this worker", connector);
+ }
+
+ }
+ return Response.ok(out).build();
+ } else {
+ return Response.ok(herder.connectors()).build();
+ }
}
@POST
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 172c9b2..7b6d16a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -103,7 +103,7 @@ public class StandaloneHerder extends AbstractHerder {
// There's no coordination/hand-off to do here since this is all standalone. Instead, we
// should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
// the tasks.
- for (String connName : configState.connectors()) {
+ for (String connName : connectors()) {
removeConnectorTasks(connName);
worker.stopConnector(connName);
}
@@ -118,12 +118,12 @@ public class StandaloneHerder extends AbstractHerder {
@Override
public synchronized void connectors(Callback<Collection<String>> callback) {
- callback.onCompletion(null, configState.connectors());
+ callback.onCompletion(null, connectors());
}
-
+
@Override
public synchronized void connectorInfo(String connName, Callback<ConnectorInfo> callback) {
- ConnectorInfo connectorInfo = createConnectorInfo(connName);
+ ConnectorInfo connectorInfo = connectorInfo(connName);
if (connectorInfo == null) {
callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
return;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index fc557c6..f7ee8a6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -125,6 +125,56 @@ public class AbstractHerderTest {
@MockStrict private StatusBackingStore statusStore;
@Test
+ public void testConnectors() {
+ AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+ .withConstructor(
+ Worker.class,
+ String.class,
+ String.class,
+ StatusBackingStore.class,
+ ConfigBackingStore.class
+ )
+ .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
+ .addMockedMethod("generation")
+ .createMock();
+
+ EasyMock.expect(herder.generation()).andStubReturn(generation);
+ EasyMock.expect(herder.config(connector)).andReturn(null);
+ EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT);
+ replayAll();
+ assertEquals(Collections.singleton(CONN1), new HashSet<>(herder.connectors()));
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testConnectorStatus() {
+ ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
+ AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+ .withConstructor(
+ Worker.class,
+ String.class,
+ String.class,
+ StatusBackingStore.class,
+ ConfigBackingStore.class
+ )
+ .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
+ .addMockedMethod("generation")
+ .createMock();
+
+ EasyMock.expect(herder.generation()).andStubReturn(generation);
+ EasyMock.expect(herder.config(connector)).andReturn(null);
+ EasyMock.expect(statusStore.get(connector))
+ .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
+ EasyMock.expect(statusStore.getAll(connector))
+ .andReturn(Collections.singletonList(
+ new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation)));
+
+ replayAll();
+ ConnectorStateInfo csi = herder.connectorStatus(connector);
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void connectorStatus() {
ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index b8ec0f1..fcac1fb 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -1295,6 +1295,7 @@ public class DistributedHerderTest {
EasyMock.expect(member.memberId()).andStubReturn("leader");
EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+ EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT).times(2);
WorkerConfigTransformer configTransformer = EasyMock.mock(WorkerConfigTransformer.class);
EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), EasyMock.anyObject()))
@@ -1376,6 +1377,7 @@ public class DistributedHerderTest {
EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
EasyMock.expect(connectorMock.validate(CONN1_CONFIG_UPDATED)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
+ EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT);
configBackingStore.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@@ -1388,7 +1390,7 @@ public class DistributedHerderTest {
});
// As a result of reconfig, should need to update snapshot. With only connector updates, we'll just restart
// connector without rebalance
- EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
+ EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG).times(2);
worker.stopConnector(CONN1);
PowerMock.expectLastCall().andReturn(true);
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 91aa5e7..3609fb3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -30,8 +30,6 @@ import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
-import org.apache.kafka.connect.util.Callback;
-import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
@@ -47,7 +45,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -212,12 +209,7 @@ public class RestServerTest {
ConnectRestExtension.class))
.andStubReturn(Collections.emptyList());
- final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture();
- herder.connectors(EasyMock.capture(connectorsCallback));
- PowerMock.expectLastCall().andAnswer(() -> {
- connectorsCallback.getValue().onCompletion(null, Arrays.asList("a", "b"));
- return null;
- });
+ EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b"));
PowerMock.replayAll();
@@ -270,12 +262,7 @@ public class RestServerTest {
workerConfig,
ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
- final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture();
- herder.connectors(EasyMock.capture(connectorsCallback));
- PowerMock.expectLastCall().andAnswer(() -> {
- connectorsCallback.getValue().onCompletion(null, Arrays.asList("a", "b"));
- return null;
- });
+ EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b"));
PowerMock.replayAll();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index f84cd25..5dc7f1e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.rest.resources;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.kafka.connect.errors.AlreadyExistsException;
-import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
@@ -28,6 +27,7 @@ import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
@@ -47,6 +47,10 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import javax.ws.rs.BadRequestException;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.UriInfo;
+
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
@@ -121,12 +125,18 @@ public class ConnectorsResourceTest {
@Mock
private Herder herder;
private ConnectorsResource connectorsResource;
+ private UriInfo forward;
@Before
public void setUp() throws NoSuchMethodException {
PowerMock.mockStatic(RestClient.class,
RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class, WorkerConfig.class));
connectorsResource = new ConnectorsResource(herder, null);
+ forward = EasyMock.mock(UriInfo.class);
+ MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
+ queryParams.putSingle("forward", "true");
+ EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes();
+ EasyMock.replay(forward);
}
private static final Map<String, String> getConnectorConfig(Map<String, String> mapToClone) {
@@ -137,12 +147,11 @@ public class ConnectorsResourceTest {
@Test
public void testListConnectors() throws Throwable {
final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
- herder.connectors(EasyMock.capture(cb));
- expectAndCallbackResult(cb, Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME));
+ EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME));
PowerMock.replayAll();
- Collection<String> connectors = connectorsResource.listConnectors(FORWARD);
+ Collection<String> connectors = (Collection<String>) connectorsResource.listConnectors(forward).getEntity();
// Ordering isn't guaranteed, compare sets
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors));
@@ -150,36 +159,107 @@ public class ConnectorsResourceTest {
}
@Test
- public void testListConnectorsNotLeader() throws Throwable {
- final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
- herder.connectors(EasyMock.capture(cb));
- expectAndCallbackNotLeaderException(cb);
- // Should forward request
- EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("GET"),
- EasyMock.isNull(), EasyMock.anyObject(TypeReference.class), EasyMock.anyObject(WorkerConfig.class)))
- .andReturn(new RestClient.HttpResponse<>(200, new HashMap<String, String>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)));
+ public void testExpandConnectorsStatus() throws Throwable {
+ EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME));
+ ConnectorStateInfo connector = EasyMock.mock(ConnectorStateInfo.class);
+ ConnectorStateInfo connector2 = EasyMock.mock(ConnectorStateInfo.class);
+ EasyMock.expect(herder.connectorStatus(CONNECTOR2_NAME)).andReturn(connector2);
+ EasyMock.expect(herder.connectorStatus(CONNECTOR_NAME)).andReturn(connector);
+
+ forward = EasyMock.mock(UriInfo.class);
+ MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
+ queryParams.putSingle("expand", "status");
+ EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes();
+ EasyMock.replay(forward);
PowerMock.replayAll();
- Collection<String> connectors = connectorsResource.listConnectors(FORWARD);
+ Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity();
// Ordering isn't guaranteed, compare sets
- assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors));
+ assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), expanded.keySet());
+ assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status"));
+ assertEquals(connector, expanded.get(CONNECTOR_NAME).get("status"));
+ PowerMock.verifyAll();
+ }
+ @Test
+ public void testExpandConnectorsInfo() throws Throwable {
+ EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME));
+ ConnectorInfo connector = EasyMock.mock(ConnectorInfo.class);
+ ConnectorInfo connector2 = EasyMock.mock(ConnectorInfo.class);
+ EasyMock.expect(herder.connectorInfo(CONNECTOR2_NAME)).andReturn(connector2);
+ EasyMock.expect(herder.connectorInfo(CONNECTOR_NAME)).andReturn(connector);
+
+ forward = EasyMock.mock(UriInfo.class);
+ MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
+ queryParams.putSingle("expand", "info");
+ EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes();
+ EasyMock.replay(forward);
+
+ PowerMock.replayAll();
+
+ Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity();
+ // Ordering isn't guaranteed, compare sets
+ assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), expanded.keySet());
+ assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("info"));
+ assertEquals(connector, expanded.get(CONNECTOR_NAME).get("info"));
PowerMock.verifyAll();
}
- @Test(expected = ConnectException.class)
- public void testListConnectorsNotSynced() throws Throwable {
- final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
- herder.connectors(EasyMock.capture(cb));
- expectAndCallbackException(cb, new ConnectException("not synced"));
+ @Test
+ public void testFullExpandConnectors() throws Throwable {
+ EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME));
+ ConnectorInfo connectorInfo = EasyMock.mock(ConnectorInfo.class);
+ ConnectorInfo connectorInfo2 = EasyMock.mock(ConnectorInfo.class);
+ EasyMock.expect(herder.connectorInfo(CONNECTOR2_NAME)).andReturn(connectorInfo2);
+ EasyMock.expect(herder.connectorInfo(CONNECTOR_NAME)).andReturn(connectorInfo);
+ ConnectorStateInfo connector = EasyMock.mock(ConnectorStateInfo.class);
+ ConnectorStateInfo connector2 = EasyMock.mock(ConnectorStateInfo.class);
+ EasyMock.expect(herder.connectorStatus(CONNECTOR2_NAME)).andReturn(connector2);
+ EasyMock.expect(herder.connectorStatus(CONNECTOR_NAME)).andReturn(connector);
+
+ forward = EasyMock.mock(UriInfo.class);
+ MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
+ queryParams.put("expand", Arrays.asList("info", "status"));
+ EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes();
+ EasyMock.replay(forward);
+
+ PowerMock.replayAll();
+
+ Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity();
+ // Ordering isn't guaranteed, compare sets
+ assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), expanded.keySet());
+ assertEquals(connectorInfo2, expanded.get(CONNECTOR2_NAME).get("info"));
+ assertEquals(connectorInfo, expanded.get(CONNECTOR_NAME).get("info"));
+ assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status"));
+ assertEquals(connector, expanded.get(CONNECTOR_NAME).get("status"));
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testExpandConnectorsWithConnectorNotFound() throws Throwable {
+ EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME));
+ ConnectorStateInfo connector = EasyMock.mock(ConnectorStateInfo.class);
+ ConnectorStateInfo connector2 = EasyMock.mock(ConnectorStateInfo.class);
+ EasyMock.expect(herder.connectorStatus(CONNECTOR2_NAME)).andReturn(connector2);
+ EasyMock.expect(herder.connectorStatus(CONNECTOR_NAME)).andThrow(EasyMock.mock(NotFoundException.class));
+
+ forward = EasyMock.mock(UriInfo.class);
+ MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
+ queryParams.putSingle("expand", "status");
+ EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes();
+ EasyMock.replay(forward);
PowerMock.replayAll();
- // throws
- connectorsResource.listConnectors(FORWARD);
+ Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity();
+ // Ordering isn't guaranteed, compare sets
+ assertEquals(Collections.singleton(CONNECTOR2_NAME), expanded.keySet());
+ assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status"));
+ PowerMock.verifyAll();
}
+
@Test
public void testCreateConnector() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));