You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/30 23:00:13 UTC
[2/4] kafka git commit: KAFKA-2369: Add REST API for Copycat.
http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java
new file mode 100644
index 0000000..823155e
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.rest.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.Herder;
+import org.apache.kafka.copycat.runtime.distributed.NotLeaderException;
+import org.apache.kafka.copycat.runtime.rest.RestServer;
+import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.copycat.runtime.rest.entities.CreateConnectorRequest;
+import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.copycat.runtime.rest.errors.CopycatRestException;
+import org.apache.kafka.copycat.util.FutureCallback;
+
+import javax.servlet.ServletContext;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+@Path("/connectors")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class ConnectorsResource {
+ // TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full
+ // session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
+ // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
+ // but currently a worker simply leaving the group can take this long as well.
+ private static final long REQUEST_TIMEOUT_MS = 90 * 1000;
+
+ private final Herder herder;
+ @javax.ws.rs.core.Context
+ private ServletContext context;
+
+ public ConnectorsResource(Herder herder) {
+ this.herder = herder;
+ }
+
+ @GET
+ @Path("/")
+ public Collection<String> listConnectors() throws Throwable {
+ FutureCallback<Collection<String>> cb = new FutureCallback<>();
+ herder.connectors(cb);
+ return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() {
+ });
+ }
+
+ @POST
+ @Path("/")
+ public Response createConnector(final CreateConnectorRequest createRequest) throws Throwable {
+ String name = createRequest.name();
+ Map<String, String> configs = createRequest.config();
+ if (!configs.containsKey(ConnectorConfig.NAME_CONFIG))
+ configs.put(ConnectorConfig.NAME_CONFIG, name);
+
+ FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
+ herder.putConnectorConfig(name, configs, false, cb);
+ Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest,
+ new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator());
+ return Response.created(URI.create("/connectors/" + name)).entity(info.result()).build();
+ }
+
+ @GET
+ @Path("/{connector}")
+ public ConnectorInfo getConnector(final @PathParam("connector") String connector) throws Throwable {
+ FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
+ herder.connectorInfo(connector, cb);
+ return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, new TypeReference<ConnectorInfo>() {
+ });
+ }
+
+ @GET
+ @Path("/{connector}/config")
+ public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector) throws Throwable {
+ FutureCallback<Map<String, String>> cb = new FutureCallback<>();
+ herder.connectorConfig(connector, cb);
+ return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, new TypeReference<Map<String, String>>() {
+ });
+ }
+
+ @PUT
+ @Path("/{connector}/config")
+ public Response putConnectorConfig(final @PathParam("connector") String connector,
+ final Map<String, String> connectorConfig) throws Throwable {
+ FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
+ herder.putConnectorConfig(connector, connectorConfig, true, cb);
+ Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
+ "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator());
+ Response.ResponseBuilder response;
+ if (createdInfo.created())
+ response = Response.created(URI.create("/connectors/" + connector));
+ else
+ response = Response.ok();
+ return response.entity(createdInfo.result()).build();
+ }
+
+ @GET
+ @Path("/{connector}/tasks")
+ public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector) throws Throwable {
+ FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
+ herder.taskConfigs(connector, cb);
+ return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference<List<TaskInfo>>() {
+ });
+ }
+
+ @POST
+ @Path("/{connector}/tasks")
+ public void putTaskConfigs(final @PathParam("connector") String connector,
+ final List<Map<String, String>> taskConfigs) throws Throwable {
+ FutureCallback<Void> cb = new FutureCallback<>();
+ herder.putTaskConfigs(connector, taskConfigs, cb);
+ completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs);
+ }
+
+ @DELETE
+ @Path("/{connector}")
+ public void destroyConnector(final @PathParam("connector") String connector) throws Throwable {
+ FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
+ herder.putConnectorConfig(connector, null, true, cb);
+ completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null);
+ }
+
+ // Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the
+ // request to the leader.
+ private <T, U> T completeOrForwardRequest(
+ FutureCallback<T> cb, String path, String method, Object body, TypeReference<U> resultType,
+ Translator<T, U> translator) throws Throwable {
+ try {
+ return cb.get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof NotLeaderException) {
+ NotLeaderException notLeaderError = (NotLeaderException) e.getCause();
+ return translator.translate(RestServer.httpRequest(RestServer.urlJoin(notLeaderError.leaderUrl(), path), method, body, resultType));
+ }
+
+ throw e.getCause();
+ } catch (TimeoutException e) {
+ // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
+ // error is the best option
+ throw new CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out");
+ } catch (InterruptedException e) {
+ throw new CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted");
+ }
+ }
+
+ private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body, TypeReference<T> resultType) throws Throwable {
+ return completeOrForwardRequest(cb, path, method, body, resultType, new IdentityTranslator<T>());
+ }
+
+ private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body) throws Throwable {
+ return completeOrForwardRequest(cb, path, method, body, null, new IdentityTranslator<T>());
+ }
+
+ private interface Translator<T, U> {
+ T translate(RestServer.HttpResponse<U> response);
+ }
+
+ private class IdentityTranslator<T> implements Translator<T, T> {
+ @Override
+ public T translate(RestServer.HttpResponse<T> response) {
+ return response.body();
+ }
+ }
+
+ private class CreatedConnectorInfoTranslator implements Translator<Herder.Created<ConnectorInfo>, ConnectorInfo> {
+ @Override
+ public Herder.Created<ConnectorInfo> translate(RestServer.HttpResponse<ConnectorInfo> response) {
+ boolean created = response.status() == 201;
+ return new Herder.Created<>(created, response.body());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/RootResource.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/RootResource.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/RootResource.java
new file mode 100644
index 0000000..d012c5b
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/RootResource.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.rest.resources;
+
+import org.apache.kafka.copycat.runtime.rest.entities.ServerInfo;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/")
+@Produces(MediaType.APPLICATION_JSON)
+public class RootResource {
+
+ @GET
+ @Path("/")
+ public ServerInfo serverInfo() {
+ return new ServerInfo();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
new file mode 100644
index 0000000..246d36d
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.standalone;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.copycat.runtime.WorkerConfig;
+
+import java.util.Properties;
+
+public class StandaloneConfig extends WorkerConfig {
+ private static final ConfigDef CONFIG;
+
+ static {
+ CONFIG = baseConfigDef();
+ }
+
+ public StandaloneConfig(Properties props) {
+ super(CONFIG, props);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
index 167ee60..24a789a 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
@@ -17,22 +17,27 @@
package org.apache.kafka.copycat.runtime.standalone;
+import org.apache.kafka.copycat.errors.AlreadyExistsException;
import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.errors.NotFoundException;
import org.apache.kafka.copycat.runtime.ConnectorConfig;
import org.apache.kafka.copycat.runtime.Herder;
import org.apache.kafka.copycat.runtime.HerderConnectorContext;
import org.apache.kafka.copycat.runtime.TaskConfig;
import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
import org.apache.kafka.copycat.util.Callback;
import org.apache.kafka.copycat.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
@@ -41,7 +46,7 @@ import java.util.Set;
public class StandaloneHerder implements Herder {
private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
- private Worker worker;
+ private final Worker worker;
private HashMap<String, ConnectorState> connectors = new HashMap<>();
public StandaloneHerder(Worker worker) {
@@ -59,40 +64,95 @@ public class StandaloneHerder implements Herder {
// There's no coordination/hand-off to do here since this is all standalone. Instead, we
// should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
// the tasks.
- for (String connName : new HashSet<>(connectors.keySet()))
- stopConnector(connName);
+ for (String connName : new HashSet<>(connectors.keySet())) {
+ removeConnectorTasks(connName);
+ try {
+ worker.stopConnector(connName);
+ } catch (CopycatException e) {
+ log.error("Error shutting down connector {}: ", connName, e);
+ }
+ }
+ connectors.clear();
log.info("Herder stopped");
}
@Override
- public synchronized void addConnector(Map<String, String> connectorProps,
- Callback<String> callback) {
- try {
- ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
- String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
- worker.addConnector(connConfig, new HerderConnectorContext(this, connName));
- connectors.put(connName, new ConnectorState(connConfig));
- if (callback != null)
- callback.onCompletion(null, connName);
- // This should always be a new job, create jobs from scratch
- createConnectorTasks(connName);
- } catch (CopycatException e) {
- if (callback != null)
- callback.onCompletion(e, null);
+ public synchronized void connectors(Callback<Collection<String>> callback) {
+ callback.onCompletion(null, new ArrayList<>(connectors.keySet()));
+ }
+
+ @Override
+ public synchronized void connectorInfo(String connName, Callback<ConnectorInfo> callback) {
+ ConnectorState state = connectors.get(connName);
+ if (state == null) {
+ callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
+ return;
}
+ callback.onCompletion(null, createConnectorInfo(state));
+ }
+
+ private ConnectorInfo createConnectorInfo(ConnectorState state) {
+ if (state == null)
+ return null;
+
+ List<ConnectorTaskId> taskIds = new ArrayList<>();
+ for (int i = 0; i < state.taskConfigs.size(); i++)
+ taskIds.add(new ConnectorTaskId(state.name, i));
+ return new ConnectorInfo(state.name, state.configOriginals, taskIds);
}
@Override
- public synchronized void deleteConnector(String connName, Callback<Void> callback) {
+ public void connectorConfig(String connName, final Callback<Map<String, String>> callback) {
+ // Subset of connectorInfo, so piggy back on that implementation
+ connectorInfo(connName, new Callback<ConnectorInfo>() {
+ @Override
+ public void onCompletion(Throwable error, ConnectorInfo result) {
+ if (error != null) {
+ callback.onCompletion(error, null);
+ return;
+ }
+ callback.onCompletion(null, result.config());
+ }
+ });
+ }
+
+ @Override
+ public synchronized void putConnectorConfig(String connName, final Map<String, String> config,
+ boolean allowReplace,
+ final Callback<Created<ConnectorInfo>> callback) {
try {
- stopConnector(connName);
- if (callback != null)
- callback.onCompletion(null, null);
+ boolean created = false;
+ if (connectors.containsKey(connName)) {
+ if (!allowReplace) {
+ callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null);
+ return;
+ }
+ if (config == null) // Deletion, kill tasks as well
+ removeConnectorTasks(connName);
+ worker.stopConnector(connName);
+ if (config == null)
+ connectors.remove(connName);
+ } else {
+ if (config == null) {
+ // Deletion, must already exist
+ callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
+ return;
+ }
+ created = true;
+ }
+ if (config != null) {
+ startConnector(config);
+ updateConnectorTasks(connName);
+ }
+ if (config != null)
+ callback.onCompletion(null, new Created<>(created, createConnectorInfo(connectors.get(connName))));
+ else
+ callback.onCompletion(null, new Created<ConnectorInfo>(false, null));
} catch (CopycatException e) {
- if (callback != null)
- callback.onCompletion(e, null);
+ callback.onCompletion(e, null);
}
+
}
@Override
@@ -104,68 +164,109 @@ public class StandaloneHerder implements Herder {
updateConnectorTasks(connName);
}
- // Stops a connectors tasks, then the connector
- private void stopConnector(String connName) {
- removeConnectorTasks(connName);
- try {
- worker.stopConnector(connName);
- connectors.remove(connName);
- } catch (CopycatException e) {
- log.error("Error shutting down connector {}: ", connName, e);
+ @Override
+ public synchronized void taskConfigs(String connName, Callback<List<TaskInfo>> callback) {
+ ConnectorState state = connectors.get(connName);
+ if (state == null) {
+ callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
+ return;
}
+
+ List<TaskInfo> result = new ArrayList<>();
+ for (int i = 0; i < state.taskConfigs.size(); i++) {
+ TaskInfo info = new TaskInfo(new ConnectorTaskId(connName, i), state.taskConfigs.get(i));
+ result.add(info);
+ }
+ callback.onCompletion(null, result);
}
- private void createConnectorTasks(String connName) {
+ @Override
+ public void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback) {
+ throw new UnsupportedOperationException("Copycat in standalone mode does not support externally setting task configurations.");
+ }
+
+ /**
+ * Start a connector in the worker and record its state.
+ * @param connectorProps new connector configuration
+ * @return the connector name
+ */
+ private String startConnector(Map<String, String> connectorProps) {
+ ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
+ String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
ConnectorState state = connectors.get(connName);
- Map<ConnectorTaskId, Map<String, String>> taskConfigs = worker.reconfigureConnectorTasks(connName,
+ worker.addConnector(connConfig, new HerderConnectorContext(this, connName));
+ if (state == null) {
+ connectors.put(connName, new ConnectorState(connectorProps, connConfig));
+ } else {
+ state.configOriginals = connectorProps;
+ state.config = connConfig;
+ }
+ return connName;
+ }
+
+
+ private List<Map<String, String>> recomputeTaskConfigs(String connName) {
+ ConnectorState state = connectors.get(connName);
+ return worker.connectorTaskConfigs(connName,
state.config.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
state.config.getList(ConnectorConfig.TOPICS_CONFIG));
+ }
- for (Map.Entry<ConnectorTaskId, Map<String, String>> taskEntry : taskConfigs.entrySet()) {
- ConnectorTaskId taskId = taskEntry.getKey();
- TaskConfig config = new TaskConfig(taskEntry.getValue());
+ private void createConnectorTasks(String connName) {
+ ConnectorState state = connectors.get(connName);
+ int index = 0;
+ for (Map<String, String> taskConfigMap : state.taskConfigs) {
+ ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
+ TaskConfig config = new TaskConfig(taskConfigMap);
try {
worker.addTask(taskId, config);
- // We only need to store the task IDs so we can clean up.
- state.tasks.add(taskId);
} catch (Throwable e) {
log.error("Failed to add task {}: ", taskId, e);
// Swallow this so we can continue updating the rest of the tasks
// FIXME what's the proper response? Kill all the tasks? Consider this the same as a task
// that died after starting successfully.
}
+ index++;
}
}
private void removeConnectorTasks(String connName) {
ConnectorState state = connectors.get(connName);
- Iterator<ConnectorTaskId> taskIter = state.tasks.iterator();
- while (taskIter.hasNext()) {
- ConnectorTaskId taskId = taskIter.next();
+ for (int i = 0; i < state.taskConfigs.size(); i++) {
+ ConnectorTaskId taskId = new ConnectorTaskId(connName, i);
try {
worker.stopTask(taskId);
- taskIter.remove();
} catch (CopycatException e) {
log.error("Failed to stop task {}: ", taskId, e);
// Swallow this so we can continue stopping the rest of the tasks
// FIXME: Forcibly kill the task?
}
}
+ state.taskConfigs = new ArrayList<>();
}
private void updateConnectorTasks(String connName) {
- removeConnectorTasks(connName);
- createConnectorTasks(connName);
+ List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
+ ConnectorState state = connectors.get(connName);
+ if (!newTaskConfigs.equals(state.taskConfigs)) {
+ removeConnectorTasks(connName);
+ state.taskConfigs = newTaskConfigs;
+ createConnectorTasks(connName);
+ }
}
private static class ConnectorState {
+ public String name;
+ public Map<String, String> configOriginals;
public ConnectorConfig config;
- Set<ConnectorTaskId> tasks;
+ List<Map<String, String>> taskConfigs;
- public ConnectorState(ConnectorConfig config) {
+ public ConnectorState(Map<String, String> configOriginals, ConnectorConfig config) {
+ this.name = config.getString(ConnectorConfig.NAME_CONFIG);
+ this.configOriginals = configOriginals;
this.config = config;
- this.tasks = new HashSet<>();
+ this.taskConfigs = new ArrayList<>();
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
index e3e498c..d4cf824 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
@@ -17,6 +17,9 @@
package org.apache.kafka.copycat.util;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
import java.io.Serializable;
/**
@@ -27,15 +30,18 @@ public class ConnectorTaskId implements Serializable, Comparable<ConnectorTaskId
private final String connector;
private final int task;
- public ConnectorTaskId(String job, int task) {
- this.connector = job;
+ @JsonCreator
+ public ConnectorTaskId(@JsonProperty("connector") String connector, @JsonProperty("task") int task) {
+ this.connector = connector;
this.task = task;
}
+ @JsonProperty
public String connector() {
return connector;
}
+ @JsonProperty
public int task() {
return task;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
index 6bf3885..862adf9 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
@@ -70,7 +70,8 @@ public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Fut
@Override
public T get(long l, TimeUnit timeUnit)
throws InterruptedException, ExecutionException, TimeoutException {
- finishedLatch.await(l, timeUnit);
+ if (!finishedLatch.await(l, timeUnit))
+ throw new TimeoutException("Timed out waiting for future");
return result();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
index 61e04b6..269482c 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
@@ -23,6 +23,10 @@ public class FutureCallback<T> extends ConvertingFutureCallback<T, T> {
super(underlying);
}
+ public FutureCallback() {
+ super(null);
+ }
+
@Override
public T convert(T result) {
return result;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index d33f846..e5e5b85 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -20,10 +20,10 @@ package org.apache.kafka.copycat.runtime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
import org.apache.kafka.copycat.sink.SinkRecord;
import org.apache.kafka.copycat.sink.SinkTask;
import org.apache.kafka.copycat.sink.SinkTaskContext;
@@ -101,7 +101,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("internal.key.converter.schemas.enable", "false");
workerProps.setProperty("internal.value.converter.schemas.enable", "false");
- workerConfig = new WorkerConfig(workerProps);
+ workerConfig = new StandaloneConfig(workerProps);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
index 13d5228..566391d 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
@@ -22,8 +22,8 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTask;
import org.apache.kafka.copycat.source.SourceTaskContext;
@@ -96,7 +96,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("internal.key.converter.schemas.enable", "false");
workerProps.setProperty("internal.value.converter.schemas.enable", "false");
- config = new WorkerConfig(workerProps);
+ config = new StandaloneConfig(workerProps);
producerCallbacks = EasyMock.newCapture();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
index 19e1462..05015a4 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -20,11 +20,11 @@ package org.apache.kafka.copycat.runtime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.connector.Connector;
import org.apache.kafka.copycat.connector.ConnectorContext;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
import org.apache.kafka.copycat.sink.SinkTask;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTask;
@@ -77,7 +77,7 @@ public class WorkerTest extends ThreadedTest {
workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("internal.key.converter.schemas.enable", "false");
workerProps.setProperty("internal.value.converter.schemas.enable", "false");
- config = new WorkerConfig(workerProps);
+ config = new StandaloneConfig(workerProps);
}
@Test
@@ -203,14 +203,14 @@ public class WorkerTest extends ThreadedTest {
} catch (CopycatException e) {
// expected
}
- Map<ConnectorTaskId, Map<String, String>> taskConfigs = worker.reconfigureConnectorTasks(CONNECTOR_ID, 2, Arrays.asList("foo", "bar"));
+ List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, 2, Arrays.asList("foo", "bar"));
Properties expectedTaskProps = new Properties();
expectedTaskProps.setProperty("foo", "bar");
expectedTaskProps.setProperty(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
expectedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, "foo,bar");
assertEquals(2, taskConfigs.size());
- assertEquals(expectedTaskProps, taskConfigs.get(new ConnectorTaskId(CONNECTOR_ID, 0)));
- assertEquals(expectedTaskProps, taskConfigs.get(new ConnectorTaskId(CONNECTOR_ID, 1)));
+ assertEquals(expectedTaskProps, taskConfigs.get(0));
+ assertEquals(expectedTaskProps, taskConfigs.get(1));
worker.stopConnector(CONNECTOR_ID);
assertEquals(Collections.emptySet(), worker.connectorNames());
// Nothing should be left, so this should effectively be a nop
http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
index c8b4874..8f28f5f 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
@@ -19,15 +19,22 @@ package org.apache.kafka.copycat.runtime.distributed;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.copycat.connector.ConnectorContext;
+import org.apache.kafka.copycat.errors.AlreadyExistsException;
import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.Herder;
import org.apache.kafka.copycat.runtime.TaskConfig;
import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.runtime.WorkerConfig;
+import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
import org.apache.kafka.copycat.source.SourceConnector;
import org.apache.kafka.copycat.source.SourceTask;
import org.apache.kafka.copycat.storage.KafkaConfigStorage;
import org.apache.kafka.copycat.util.Callback;
import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.FutureCallback;
import org.apache.kafka.copycat.util.TestFuture;
+import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Before;
@@ -40,57 +47,86 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.TimeoutException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
@RunWith(PowerMockRunner.class)
@PrepareForTest(DistributedHerder.class)
@PowerMockIgnore("javax.management.*")
public class DistributedHerderTest {
- private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
+ private static final Properties HERDER_CONFIG = new Properties();
static {
HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic");
HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- HERDER_CONFIG.put(DistributedHerderConfig.GROUP_ID_CONFIG, "test-copycat-group");
+ HERDER_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "test-copycat-group");
+ // The WorkerConfig base class has some required settings without defaults
+ HERDER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.copycat.json.JsonConverter");
+ HERDER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.copycat.json.JsonConverter");
+ HERDER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.copycat.json.JsonConverter");
+ HERDER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.copycat.json.JsonConverter");
}
+ private static final String MEMBER_URL = "memberUrl";
private static final String CONN1 = "sourceA";
- private static final String CONN2 = "sourceA";
+ private static final String CONN2 = "sourceB";
private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0);
private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1);
private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2);
private static final Integer MAX_TASKS = 3;
- private static final Map<String, String> CONNECTOR_CONFIG = new HashMap<>();
+ private static final Map<String, String> CONN1_CONFIG = new HashMap<>();
+ static {
+ CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1);
+ CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
+ CONN1_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+ CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
+ }
+ private static final Map<String, String> CONN1_CONFIG_UPDATED = new HashMap<>(CONN1_CONFIG);
static {
- CONNECTOR_CONFIG.put(ConnectorConfig.NAME_CONFIG, "sourceA");
- CONNECTOR_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
- CONNECTOR_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
- CONNECTOR_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
+ CONN1_CONFIG_UPDATED.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar,baz");
+ }
+ private static final Map<String, String> CONN2_CONFIG = new HashMap<>();
+ static {
+ CONN2_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN2);
+ CONN2_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
+ CONN2_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+ CONN2_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
}
private static final Map<String, String> TASK_CONFIG = new HashMap<>();
static {
TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG, BogusSourceTask.class.getName());
}
- private static final HashMap<ConnectorTaskId, Map<String, String>> TASK_CONFIGS = new HashMap<>();
+ private static final List<Map<String, String>> TASK_CONFIGS = new ArrayList<>();
static {
- TASK_CONFIGS.put(TASK0, TASK_CONFIG);
- TASK_CONFIGS.put(TASK1, TASK_CONFIG);
- TASK_CONFIGS.put(TASK2, TASK_CONFIG);
+ TASK_CONFIGS.add(TASK_CONFIG);
+ TASK_CONFIGS.add(TASK_CONFIG);
+ TASK_CONFIGS.add(TASK_CONFIG);
+ }
+ private static final HashMap<ConnectorTaskId, Map<String, String>> TASK_CONFIGS_MAP = new HashMap<>();
+ static {
+ TASK_CONFIGS_MAP.put(TASK0, TASK_CONFIG);
+ TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
+ TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
}
private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
- Collections.singletonMap(CONN1, CONNECTOR_CONFIG), TASK_CONFIGS, Collections.<String>emptySet());
+ Collections.singletonMap(CONN1, CONN1_CONFIG), TASK_CONFIGS_MAP, Collections.<String>emptySet());
+ private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), TASK_CONFIGS_MAP, Collections.<String>emptySet());
@Mock private KafkaConfigStorage configStorage;
@Mock private WorkerGroupMember member;
private DistributedHerder herder;
@Mock private Worker worker;
- @Mock private Callback<String> createCallback;
- @Mock private Callback<Void> destroyCallback;
+ @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
private Callback<String> connectorConfigCallback;
private Callback<List<ConnectorTaskId>> taskConfigCallback;
@@ -101,7 +137,7 @@ public class DistributedHerderTest {
worker = PowerMock.createMock(Worker.class);
herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"},
- worker, HERDER_CONFIG, configStorage, member);
+ new DistributedConfig(HERDER_CONFIG), worker, configStorage, member, MEMBER_URL);
connectorConfigCallback = Whitebox.invokeMethod(herder, "connectorConfigCallback");
taskConfigCallback = Whitebox.invokeMethod(herder, "taskConfigCallback");
rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener");
@@ -115,7 +151,7 @@ public class DistributedHerderTest {
expectPostRebalanceCatchup(SNAPSHOT);
worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
PowerMock.expectLastCall();
- EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
@@ -156,9 +192,11 @@ public class DistributedHerderTest {
member.wakeup();
PowerMock.expectLastCall();
- configStorage.putConnectorConfig(CONN1, CONNECTOR_CONFIG);
+ // CONN2 is new, should succeed
+ configStorage.putConnectorConfig(CONN2, CONN2_CONFIG);
PowerMock.expectLastCall();
- createCallback.onCompletion(null, CONN1);
+ ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.<ConnectorTaskId>emptyList());
+ putConnectorCallback.onCompletion(null, new Herder.Created<>(true, info));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -166,7 +204,30 @@ public class DistributedHerderTest {
PowerMock.replayAll();
- herder.addConnector(CONNECTOR_CONFIG, createCallback);
+ herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, putConnectorCallback);
+ herder.tick();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCreateConnectorAlreadyExists() throws Exception {
+ EasyMock.expect(member.memberId()).andStubReturn("leader");
+ expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+ expectPostRebalanceCatchup(SNAPSHOT);
+
+ member.wakeup();
+ PowerMock.expectLastCall();
+ // CONN1 already exists
+ putConnectorCallback.onCompletion(EasyMock.<AlreadyExistsException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
+ PowerMock.expectLastCall();
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+ // No immediate action besides this -- change will be picked up via the config log
+
+ PowerMock.replayAll();
+
+ herder.putConnectorConfig(CONN1, CONN1_CONFIG, false, putConnectorCallback);
herder.tick();
PowerMock.verifyAll();
@@ -180,14 +241,14 @@ public class DistributedHerderTest {
expectPostRebalanceCatchup(SNAPSHOT);
worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
PowerMock.expectLastCall();
- EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
// And delete the connector
member.wakeup();
PowerMock.expectLastCall();
configStorage.putConnectorConfig(CONN1, null);
PowerMock.expectLastCall();
- destroyCallback.onCompletion(null, null);
+ putConnectorCallback.onCompletion(null, new Herder.Created<ConnectorInfo>(false, null));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -195,7 +256,7 @@ public class DistributedHerderTest {
PowerMock.replayAll();
- herder.deleteConnector(CONN1, destroyCallback);
+ herder.putConnectorConfig(CONN1, null, true, putConnectorCallback);
herder.tick();
PowerMock.verifyAll();
@@ -224,7 +285,7 @@ public class DistributedHerderTest {
CopycatProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
PowerMock.expectLastCall();
- EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -250,7 +311,7 @@ public class DistributedHerderTest {
expectPostRebalanceCatchup(SNAPSHOT);
worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
PowerMock.expectLastCall();
- EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -263,7 +324,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
PowerMock.expectLastCall();
- EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -322,7 +383,7 @@ public class DistributedHerderTest {
TestFuture<Void> readToEndFuture = new TestFuture<>();
readToEndFuture.resolveOnGet(new TimeoutException());
EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture);
- PowerMock.expectPrivate(herder, "backoff", DistributedHerderConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
+ PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
member.requestRejoin();
// After backoff, restart the process and this time succeed
@@ -331,7 +392,7 @@ public class DistributedHerderTest {
worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
PowerMock.expectLastCall();
- EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
@@ -346,6 +407,123 @@ public class DistributedHerderTest {
}
@Test
+ public void testAccessors() throws Exception {
+ EasyMock.expect(member.memberId()).andStubReturn("leader");
+ expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+ expectPostRebalanceCatchup(SNAPSHOT);
+
+
+ member.wakeup();
+ PowerMock.expectLastCall().anyTimes();
+ // list connectors, get connector info, get connector config, get task configs
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+
+ PowerMock.replayAll();
+
+ FutureCallback<Collection<String>> listConnectorsCb = new FutureCallback<>();
+ herder.connectors(listConnectorsCb);
+ FutureCallback<ConnectorInfo> connectorInfoCb = new FutureCallback<>();
+ herder.connectorInfo(CONN1, connectorInfoCb);
+ FutureCallback<Map<String, String>> connectorConfigCb = new FutureCallback<>();
+ herder.connectorConfig(CONN1, connectorConfigCb);
+ FutureCallback<List<TaskInfo>> taskConfigsCb = new FutureCallback<>();
+ herder.taskConfigs(CONN1, taskConfigsCb);
+
+ herder.tick();
+ assertTrue(listConnectorsCb.isDone());
+ assertEquals(Collections.singleton(CONN1), listConnectorsCb.get());
+ assertTrue(connectorInfoCb.isDone());
+ ConnectorInfo info = new ConnectorInfo(CONN1, CONN1_CONFIG, Arrays.asList(TASK0, TASK1, TASK2));
+ assertEquals(info, connectorInfoCb.get());
+ assertTrue(connectorConfigCb.isDone());
+ assertEquals(CONN1_CONFIG, connectorConfigCb.get());
+ assertTrue(taskConfigsCb.isDone());
+ assertEquals(Arrays.asList(
+ new TaskInfo(TASK0, TASK_CONFIG),
+ new TaskInfo(TASK1, TASK_CONFIG),
+ new TaskInfo(TASK2, TASK_CONFIG)),
+ taskConfigsCb.get());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testPutConnectorConfig() throws Exception {
+ EasyMock.expect(member.memberId()).andStubReturn("leader");
+ expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
+ expectPostRebalanceCatchup(SNAPSHOT);
+ worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+ PowerMock.expectLastCall();
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+
+ // list connectors, get connector info, get connector config, get task configs
+ member.wakeup();
+ PowerMock.expectLastCall().anyTimes();
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ // Poll loop for second round of calls
+ member.ensureActive();
+ PowerMock.expectLastCall();
+ configStorage.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
+ PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ // Simulate response to writing config + waiting until end of log to be read
+ connectorConfigCallback.onCompletion(null, CONN1);
+ return null;
+ }
+ });
+ // As a result of reconfig, should need to update snapshot. With only connector updates, we'll just restart
+ // connector without rebalance
+ EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
+ worker.stopConnector(CONN1);
+ PowerMock.expectLastCall();
+ Capture<ConnectorConfig> capturedUpdatedConfig = EasyMock.newCapture();
+ worker.addConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject());
+ PowerMock.expectLastCall();
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ // Third tick just to read the config
+ member.ensureActive();
+ PowerMock.expectLastCall();
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ // Should pick up original config
+ FutureCallback<Map<String, String>> connectorConfigCb = new FutureCallback<>();
+ herder.connectorConfig(CONN1, connectorConfigCb);
+ herder.tick();
+ assertTrue(connectorConfigCb.isDone());
+ assertEquals(CONN1_CONFIG, connectorConfigCb.get());
+
+ // Apply new config.
+ FutureCallback<Herder.Created<ConnectorInfo>> putConfigCb = new FutureCallback<>();
+ herder.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED, true, putConfigCb);
+ herder.tick();
+ assertTrue(putConfigCb.isDone());
+ ConnectorInfo updatedInfo = new ConnectorInfo(CONN1, CONN1_CONFIG_UPDATED, Arrays.asList(TASK0, TASK1, TASK2));
+ assertEquals(new Herder.Created<>(false, updatedInfo), putConfigCb.get());
+
+ // Check config again to validate change
+ connectorConfigCb = new FutureCallback<>();
+ herder.connectorConfig(CONN1, connectorConfigCb);
+ herder.tick();
+ assertTrue(connectorConfigCb.isDone());
+ assertEquals(CONN1_CONFIG_UPDATED, connectorConfigCb.get());
+ // The config passed to Worker should
+ assertEquals(Arrays.asList("foo", "bar", "baz"),
+ capturedUpdatedConfig.getValue().getList(ConnectorConfig.TOPICS_CONFIG));
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testInconsistentConfigs() throws Exception {
// FIXME: if we have inconsistent configs, we need to request forced reconfig + write of the connector's task configs
// This requires inter-worker communication, so needs the REST API
@@ -366,7 +544,7 @@ public class DistributedHerderTest {
if (revokedConnectors != null)
rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks);
CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment(
- error, "leader", offset, assignedConnectors, assignedTasks);
+ error, "leader", "leaderUrl", offset, assignedConnectors, assignedTasks);
rebalanceListener.onAssigned(assignment);
return null;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
index 2278045..ca53674 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
@@ -55,6 +55,9 @@ import static org.junit.Assert.assertFalse;
public class WorkerCoordinatorTest {
+ private static final String LEADER_URL = "leaderUrl:8083";
+ private static final String MEMBER_URL = "memberUrl:8083";
+
private String connectorId = "connector";
private String connectorId2 = "connector2";
private ConnectorTaskId taskId0 = new ConnectorTaskId(connectorId, 0);
@@ -104,6 +107,7 @@ public class WorkerCoordinatorTest {
time,
requestTimeoutMs,
retryBackoffMs,
+ LEADER_URL,
configStorage,
rebalanceListener);
@@ -147,7 +151,7 @@ public class WorkerCoordinatorTest {
LinkedHashMap<String, ByteBuffer> serialized = coordinator.metadata();
assertEquals(1, serialized.size());
- CopycatProtocol.ConfigState state = CopycatProtocol.deserializeMetadata(serialized.get(WorkerCoordinator.DEFAULT_SUBPROTOCOL));
+ CopycatProtocol.WorkerState state = CopycatProtocol.deserializeMetadata(serialized.get(WorkerCoordinator.DEFAULT_SUBPROTOCOL));
assertEquals(1, state.offset());
PowerMock.verifyAll();
@@ -322,8 +326,8 @@ public class WorkerCoordinatorTest {
Map<String, ByteBuffer> configs = new HashMap<>();
// Mark everyone as in sync with configState1
- configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
- configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
+ configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.WorkerState(LEADER_URL, 1L)));
+ configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.WorkerState(MEMBER_URL, 1L)));
Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
// configState1 has 1 connector, 1 task
@@ -358,8 +362,8 @@ public class WorkerCoordinatorTest {
Map<String, ByteBuffer> configs = new HashMap<>();
// Mark everyone as in sync with configState1
- configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
- configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
+ configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.WorkerState(LEADER_URL, 1L)));
+ configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.WorkerState(MEMBER_URL, 1L)));
Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
// configState2 has 2 connector, 3 tasks and should trigger round robin assignment
@@ -390,7 +394,10 @@ public class WorkerCoordinatorTest {
Map<String, Long> configOffsets, short error) {
Map<String, ByteBuffer> metadata = new HashMap<>();
for (Map.Entry<String, Long> configStateEntry : configOffsets.entrySet()) {
- ByteBuffer buf = CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(configStateEntry.getValue()));
+ // We need a member URL, but it doesn't matter for the purposes of this test. Just set it to the member ID
+ String memberUrl = configStateEntry.getKey();
+ long configOffset = configStateEntry.getValue();
+ ByteBuffer buf = CopycatProtocol.serializeMetadata(new CopycatProtocol.WorkerState(memberUrl, configOffset));
metadata.put(configStateEntry.getKey(), buf);
}
return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata).toStruct();
@@ -403,7 +410,7 @@ public class WorkerCoordinatorTest {
private Struct syncGroupResponse(short assignmentError, String leader, long configOffset, List<String> connectorIds,
List<ConnectorTaskId> taskIds, short error) {
- CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment(assignmentError, leader, configOffset, connectorIds, taskIds);
+ CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment(assignmentError, leader, LEADER_URL, configOffset, connectorIds, taskIds);
ByteBuffer buf = CopycatProtocol.serializeAssignment(assignment);
return new SyncGroupResponse(error, buf).toStruct();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java
new file mode 100644
index 0000000..c987092
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.rest.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.kafka.copycat.errors.AlreadyExistsException;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.errors.NotFoundException;
+import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.Herder;
+import org.apache.kafka.copycat.runtime.distributed.NotLeaderException;
+import org.apache.kafka.copycat.runtime.rest.RestServer;
+import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.copycat.runtime.rest.entities.CreateConnectorRequest;
+import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(RestServer.class)
+@PowerMockIgnore("javax.management.*")
+public class ConnectorsResourceTest {
+ // Note trailing / and that we do *not* use LEADER_URL to construct our reference values. This checks that we handle
+ // URL construction properly, avoiding //, which will mess up routing in the REST server
+ private static final String LEADER_URL = "http://leader:8083/";
+ private static final String CONNECTOR_NAME = "test";
+ private static final String CONNECTOR2_NAME = "test2";
+ private static final Map<String, String> CONNECTOR_CONFIG = new HashMap<>();
+ static {
+ CONNECTOR_CONFIG.put("name", CONNECTOR_NAME);
+ CONNECTOR_CONFIG.put("sample_config", "test_config");
+ }
+ private static final List<ConnectorTaskId> CONNECTOR_TASK_NAMES = Arrays.asList(
+ new ConnectorTaskId(CONNECTOR_NAME, 0),
+ new ConnectorTaskId(CONNECTOR_NAME, 1)
+ );
+ private static final List<Map<String, String>> TASK_CONFIGS = new ArrayList<>();
+ static {
+ TASK_CONFIGS.add(Collections.singletonMap("config", "value"));
+ TASK_CONFIGS.add(Collections.singletonMap("config", "other_value"));
+ }
+ private static final List<TaskInfo> TASK_INFOS = new ArrayList<>();
+ static {
+ TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), TASK_CONFIGS.get(0)));
+ TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 1), TASK_CONFIGS.get(1)));
+ }
+
+
+ @Mock
+ private Herder herder;
+ private ConnectorsResource connectorsResource;
+
+ @Before
+ public void setUp() throws NoSuchMethodException {
+ PowerMock.mockStatic(RestServer.class,
+ RestServer.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class));
+ connectorsResource = new ConnectorsResource(herder);
+ }
+
+ @Test
+ public void testListConnectors() throws Throwable {
+ final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
+ herder.connectors(EasyMock.capture(cb));
+ expectAndCallbackResult(cb, Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME));
+
+ PowerMock.replayAll();
+
+ Collection<String> connectors = connectorsResource.listConnectors();
+ // Ordering isn't guaranteed, compare sets
+ assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors));
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testListConnectorsNotLeader() throws Throwable {
+ final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
+ herder.connectors(EasyMock.capture(cb));
+ expectAndCallbackNotLeaderException(cb);
+ // Should forward request
+ EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors"), EasyMock.eq("GET"),
+ EasyMock.isNull(), EasyMock.anyObject(TypeReference.class)))
+ .andReturn(new RestServer.HttpResponse<>(200, new HashMap<String, List<String>>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)));
+
+ PowerMock.replayAll();
+
+ Collection<String> connectors = connectorsResource.listConnectors();
+ // Ordering isn't guaranteed, compare sets
+ assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors));
+
+ PowerMock.verifyAll();
+ }
+
+ @Test(expected = CopycatException.class)
+ public void testListConnectorsNotSynced() throws Throwable {
+ final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
+ herder.connectors(EasyMock.capture(cb));
+ expectAndCallbackException(cb, new CopycatException("not synced"));
+
+ PowerMock.replayAll();
+
+ // throws
+ connectorsResource.listConnectors();
+ }
+
+ @Test
+ public void testCreateConnector() throws Throwable {
+ CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+ herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
+
+ PowerMock.replayAll();
+
+ connectorsResource.createConnector(body);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCreateConnectorNotLeader() throws Throwable {
+ CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+ herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+ expectAndCallbackNotLeaderException(cb);
+ // Should forward request
+ EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors"), EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject()))
+ .andReturn(new RestServer.HttpResponse<>(201, new HashMap<String, List<String>>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
+
+ PowerMock.replayAll();
+
+ connectorsResource.createConnector(body);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test(expected = AlreadyExistsException.class)
+ public void testCreateConnectorExists() throws Throwable {
+ CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+ herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+ expectAndCallbackException(cb, new AlreadyExistsException("already exists"));
+
+ PowerMock.replayAll();
+
+ connectorsResource.createConnector(body);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testDeleteConnector() throws Throwable {
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+ herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, null);
+
+ PowerMock.replayAll();
+
+ connectorsResource.destroyConnector(CONNECTOR_NAME);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testDeleteConnectorNotLeader() throws Throwable {
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+ herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb));
+ expectAndCallbackNotLeaderException(cb);
+ // Should forward request
+ EasyMock.expect(RestServer.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME, "DELETE", null, null))
+ .andReturn(new RestServer.HttpResponse<>(204, new HashMap<String, List<String>>(), null));
+
+ PowerMock.replayAll();
+
+ connectorsResource.destroyConnector(CONNECTOR_NAME);
+
+ PowerMock.verifyAll();
+ }
+
+ // Not found exceptions should pass through to caller so they can be processed for 404s
+ @Test(expected = NotFoundException.class)
+ public void testDeleteConnectorNotFound() throws Throwable {
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+ herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb));
+ expectAndCallbackException(cb, new NotFoundException("not found"));
+
+ PowerMock.replayAll();
+
+ connectorsResource.destroyConnector(CONNECTOR_NAME);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testGetConnector() throws Throwable {
+ final Capture<Callback<ConnectorInfo>> cb = Capture.newInstance();
+ herder.connectorInfo(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES));
+
+ PowerMock.replayAll();
+
+ ConnectorInfo connInfo = connectorsResource.getConnector(CONNECTOR_NAME);
+ assertEquals(new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES), connInfo);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testGetConnectorConfig() throws Throwable {
+ final Capture<Callback<Map<String, String>>> cb = Capture.newInstance();
+ herder.connectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, CONNECTOR_CONFIG);
+
+ PowerMock.replayAll();
+
+ Map<String, String> connConfig = connectorsResource.getConnectorConfig(CONNECTOR_NAME);
+ assertEquals(CONNECTOR_CONFIG, connConfig);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test(expected = NotFoundException.class)
+ public void testGetConnectorConfigConnectorNotFound() throws Throwable {
+ final Capture<Callback<Map<String, String>>> cb = Capture.newInstance();
+ herder.connectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+ expectAndCallbackException(cb, new NotFoundException("not found"));
+
+ PowerMock.replayAll();
+
+ connectorsResource.getConnectorConfig(CONNECTOR_NAME);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testPutConnectorConfig() throws Throwable {
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+ herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(CONNECTOR_CONFIG), EasyMock.eq(true), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, new Herder.Created<>(false, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
+
+ PowerMock.replayAll();
+
+ connectorsResource.putConnectorConfig(CONNECTOR_NAME, CONNECTOR_CONFIG);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testGetConnectorTaskConfigs() throws Throwable {
+ final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance();
+ herder.taskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, TASK_INFOS);
+
+ PowerMock.replayAll();
+
+ List<TaskInfo> taskInfos = connectorsResource.getTaskConfigs(CONNECTOR_NAME);
+ assertEquals(TASK_INFOS, taskInfos);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test(expected = NotFoundException.class)
+ public void testGetConnectorTaskConfigsConnectorNotFound() throws Throwable {
+ final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance();
+ herder.taskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+ expectAndCallbackException(cb, new NotFoundException("connector not found"));
+
+ PowerMock.replayAll();
+
+ connectorsResource.getTaskConfigs(CONNECTOR_NAME);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testPutConnectorTaskConfigs() throws Throwable {
+ final Capture<Callback<Void>> cb = Capture.newInstance();
+ herder.putTaskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(TASK_CONFIGS), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, null);
+
+ PowerMock.replayAll();
+
+ connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test(expected = NotFoundException.class)
+ public void testPutConnectorTaskConfigsConnectorNotFound() throws Throwable {
+ final Capture<Callback<Void>> cb = Capture.newInstance();
+ herder.putTaskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(TASK_CONFIGS), EasyMock.capture(cb));
+ expectAndCallbackException(cb, new NotFoundException("not found"));
+
+ PowerMock.replayAll();
+
+ connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS);
+
+ PowerMock.verifyAll();
+ }
+
+ private <T> void expectAndCallbackResult(final Capture<Callback<T>> cb, final T value) {
+ PowerMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ cb.getValue().onCompletion(null, value);
+ return null;
+ }
+ });
+ }
+
+ private <T> void expectAndCallbackException(final Capture<Callback<T>> cb, final Throwable t) {
+ PowerMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ cb.getValue().onCompletion(t, null);
+ return null;
+ }
+ });
+ }
+
+ private <T> void expectAndCallbackNotLeaderException(final Capture<Callback<T>> cb) {
+ expectAndCallbackException(cb, new NotLeaderException("not leader test", LEADER_URL));
+ }
+}