You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/09 07:11:25 UTC

[06/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/CreateConnectorRequest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/CreateConnectorRequest.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/CreateConnectorRequest.java
deleted file mode 100644
index 02ff08b..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/CreateConnectorRequest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.rest.entities;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.Map;
-import java.util.Objects;
-
-public class CreateConnectorRequest {
-    private final String name;
-    private final Map<String, String> config;
-
-    @JsonCreator
-    public CreateConnectorRequest(@JsonProperty("name") String name, @JsonProperty("config") Map<String, String> config) {
-        this.name = name;
-        this.config = config;
-    }
-
-    @JsonProperty
-    public String name() {
-        return name;
-    }
-
-    @JsonProperty
-    public Map<String, String> config() {
-        return config;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        CreateConnectorRequest that = (CreateConnectorRequest) o;
-        return Objects.equals(name, that.name) &&
-                Objects.equals(config, that.config);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(name, config);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ErrorMessage.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ErrorMessage.java
deleted file mode 100644
index 6cbc140..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ErrorMessage.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.rest.entities;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.Objects;
-
-/**
- * Standard error format for all REST API failures. These are generated automatically by
- * {@link org.apache.kafka.copycat.runtime.rest.errors.CopycatExceptionMapper} in response to uncaught
- * {@link org.apache.kafka.copycat.errors.CopycatException}s.
- */
-public class ErrorMessage {
-    private final int errorCode;
-    private final String message;
-
-    @JsonCreator
-    public ErrorMessage(@JsonProperty("error_code") int errorCode, @JsonProperty("message") String message) {
-        this.errorCode = errorCode;
-        this.message = message;
-    }
-
-    @JsonProperty("error_code")
-    public int errorCode() {
-        return errorCode;
-    }
-
-    @JsonProperty
-    public String message() {
-        return message;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        ErrorMessage that = (ErrorMessage) o;
-        return Objects.equals(errorCode, that.errorCode) &&
-                Objects.equals(message, that.message);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(errorCode, message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ServerInfo.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ServerInfo.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ServerInfo.java
deleted file mode 100644
index 5393163..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ServerInfo.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.rest.entities;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.common.utils.AppInfoParser;
-
-public class ServerInfo {
-    private String version;
-    private String commit;
-
-    public ServerInfo() {
-        version = AppInfoParser.getVersion();
-        commit = AppInfoParser.getCommitId();
-    }
-
-    @JsonProperty
-    public String version() {
-        return version;
-    }
-
-    @JsonProperty
-    public String commit() {
-        return commit;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/TaskInfo.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/TaskInfo.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/TaskInfo.java
deleted file mode 100644
index 9206ea0..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/TaskInfo.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.rest.entities;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-
-import java.util.Map;
-import java.util.Objects;
-
-public class TaskInfo {
-    private final ConnectorTaskId id;
-    private final Map<String, String> config;
-
-    public TaskInfo(ConnectorTaskId id, Map<String, String> config) {
-        this.id = id;
-        this.config = config;
-    }
-
-    @JsonProperty
-    public ConnectorTaskId id() {
-        return id;
-    }
-
-    @JsonProperty
-    public Map<String, String> config() {
-        return config;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        TaskInfo taskInfo = (TaskInfo) o;
-        return Objects.equals(id, taskInfo.id) &&
-                Objects.equals(config, taskInfo.config);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(id, config);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatExceptionMapper.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatExceptionMapper.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatExceptionMapper.java
deleted file mode 100644
index 760200c..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatExceptionMapper.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.rest.errors;
-
-import org.apache.kafka.copycat.errors.AlreadyExistsException;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.errors.NotFoundException;
-import org.apache.kafka.copycat.runtime.rest.entities.ErrorMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.core.Response;
-import javax.ws.rs.ext.ExceptionMapper;
-
-public class CopycatExceptionMapper implements ExceptionMapper<CopycatException> {
-    private static final Logger log = LoggerFactory.getLogger(CopycatExceptionMapper.class);
-
-    @Override
-    public Response toResponse(CopycatException exception) {
-        log.debug("Uncaught exception in REST call: ", exception);
-
-        if (exception instanceof CopycatRestException) {
-            CopycatRestException restException = (CopycatRestException) exception;
-            return Response.status(restException.statusCode())
-                    .entity(new ErrorMessage(restException.errorCode(), restException.getMessage()))
-                    .build();
-        }
-
-        if (exception instanceof NotFoundException) {
-            return Response.status(Response.Status.NOT_FOUND)
-                    .entity(new ErrorMessage(Response.Status.NOT_FOUND.getStatusCode(), exception.getMessage()))
-                    .build();
-        }
-
-        if (exception instanceof AlreadyExistsException) {
-            return Response.status(Response.Status.CONFLICT)
-                    .entity(new ErrorMessage(Response.Status.CONFLICT.getStatusCode(), exception.getMessage()))
-                    .build();
-        }
-
-        return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
-                .entity(new ErrorMessage(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), exception.getMessage()))
-                .build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatRestException.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatRestException.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatRestException.java
deleted file mode 100644
index efcf69d..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatRestException.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.rest.errors;
-
-import org.apache.kafka.copycat.errors.CopycatException;
-
-import javax.ws.rs.core.Response;
-
-public class CopycatRestException extends CopycatException {
-    private final int statusCode;
-    private final int errorCode;
-
-    public CopycatRestException(int statusCode, int errorCode, String message, Throwable t) {
-        super(message, t);
-        this.statusCode = statusCode;
-        this.errorCode = errorCode;
-    }
-
-    public CopycatRestException(Response.Status status, int errorCode, String message, Throwable t) {
-        this(status.getStatusCode(), errorCode, message, t);
-    }
-
-    public CopycatRestException(int statusCode, int errorCode, String message) {
-        this(statusCode, errorCode, message, null);
-    }
-
-    public CopycatRestException(Response.Status status, int errorCode, String message) {
-        this(status, errorCode, message, null);
-    }
-
-    public CopycatRestException(int statusCode, String message, Throwable t) {
-        this(statusCode, statusCode, message, t);
-    }
-
-    public CopycatRestException(Response.Status status, String message, Throwable t) {
-        this(status, status.getStatusCode(), message, t);
-    }
-
-    public CopycatRestException(int statusCode, String message) {
-        this(statusCode, statusCode, message, null);
-    }
-
-    public CopycatRestException(Response.Status status, String message) {
-        this(status.getStatusCode(), status.getStatusCode(), message, null);
-    }
-
-
-    public int statusCode() {
-        return statusCode;
-    }
-
-    public int errorCode() {
-        return errorCode;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java
deleted file mode 100644
index 823155e..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.rest.resources;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.kafka.copycat.runtime.ConnectorConfig;
-import org.apache.kafka.copycat.runtime.Herder;
-import org.apache.kafka.copycat.runtime.distributed.NotLeaderException;
-import org.apache.kafka.copycat.runtime.rest.RestServer;
-import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
-import org.apache.kafka.copycat.runtime.rest.entities.CreateConnectorRequest;
-import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
-import org.apache.kafka.copycat.runtime.rest.errors.CopycatRestException;
-import org.apache.kafka.copycat.util.FutureCallback;
-
-import javax.servlet.ServletContext;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.net.URI;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-@Path("/connectors")
-@Produces(MediaType.APPLICATION_JSON)
-@Consumes(MediaType.APPLICATION_JSON)
-public class ConnectorsResource {
-    // TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full
-    // session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
-    // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
-    // but currently a worker simply leaving the group can take this long as well.
-    private static final long REQUEST_TIMEOUT_MS = 90 * 1000;
-
-    private final Herder herder;
-    @javax.ws.rs.core.Context
-    private ServletContext context;
-
-    public ConnectorsResource(Herder herder) {
-        this.herder = herder;
-    }
-
-    @GET
-    @Path("/")
-    public Collection<String> listConnectors() throws Throwable {
-        FutureCallback<Collection<String>> cb = new FutureCallback<>();
-        herder.connectors(cb);
-        return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() {
-        });
-    }
-
-    @POST
-    @Path("/")
-    public Response createConnector(final CreateConnectorRequest createRequest) throws Throwable {
-        String name = createRequest.name();
-        Map<String, String> configs = createRequest.config();
-        if (!configs.containsKey(ConnectorConfig.NAME_CONFIG))
-            configs.put(ConnectorConfig.NAME_CONFIG, name);
-
-        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
-        herder.putConnectorConfig(name, configs, false, cb);
-        Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest,
-                new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator());
-        return Response.created(URI.create("/connectors/" + name)).entity(info.result()).build();
-    }
-
-    @GET
-    @Path("/{connector}")
-    public ConnectorInfo getConnector(final @PathParam("connector") String connector) throws Throwable {
-        FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
-        herder.connectorInfo(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, new TypeReference<ConnectorInfo>() {
-        });
-    }
-
-    @GET
-    @Path("/{connector}/config")
-    public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector) throws Throwable {
-        FutureCallback<Map<String, String>> cb = new FutureCallback<>();
-        herder.connectorConfig(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, new TypeReference<Map<String, String>>() {
-        });
-    }
-
-    @PUT
-    @Path("/{connector}/config")
-    public Response putConnectorConfig(final @PathParam("connector") String connector,
-                                   final Map<String, String> connectorConfig) throws Throwable {
-        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
-        herder.putConnectorConfig(connector, connectorConfig, true, cb);
-        Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
-                "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator());
-        Response.ResponseBuilder response;
-        if (createdInfo.created())
-            response = Response.created(URI.create("/connectors/" + connector));
-        else
-            response = Response.ok();
-        return response.entity(createdInfo.result()).build();
-    }
-
-    @GET
-    @Path("/{connector}/tasks")
-    public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector) throws Throwable {
-        FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
-        herder.taskConfigs(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference<List<TaskInfo>>() {
-        });
-    }
-
-    @POST
-    @Path("/{connector}/tasks")
-    public void putTaskConfigs(final @PathParam("connector") String connector,
-                               final List<Map<String, String>> taskConfigs) throws Throwable {
-        FutureCallback<Void> cb = new FutureCallback<>();
-        herder.putTaskConfigs(connector, taskConfigs, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs);
-    }
-
-    @DELETE
-    @Path("/{connector}")
-    public void destroyConnector(final @PathParam("connector") String connector) throws Throwable {
-        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
-        herder.putConnectorConfig(connector, null, true, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null);
-    }
-
-    // Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the
-    // request to the leader.
-    private <T, U> T completeOrForwardRequest(
-            FutureCallback<T> cb, String path, String method, Object body, TypeReference<U> resultType,
-            Translator<T, U> translator) throws Throwable {
-        try {
-            return cb.get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof NotLeaderException) {
-                NotLeaderException notLeaderError = (NotLeaderException) e.getCause();
-                return translator.translate(RestServer.httpRequest(RestServer.urlJoin(notLeaderError.leaderUrl(), path), method, body, resultType));
-            }
-
-            throw e.getCause();
-        } catch (TimeoutException e) {
-            // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
-            // error is the best option
-            throw new CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out");
-        } catch (InterruptedException e) {
-            throw new CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted");
-        }
-    }
-
-    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body, TypeReference<T> resultType) throws Throwable {
-        return completeOrForwardRequest(cb, path, method, body, resultType, new IdentityTranslator<T>());
-    }
-
-    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body) throws Throwable {
-        return completeOrForwardRequest(cb, path, method, body, null, new IdentityTranslator<T>());
-    }
-
-    private interface Translator<T, U> {
-        T translate(RestServer.HttpResponse<U> response);
-    }
-
-    private class IdentityTranslator<T> implements Translator<T, T> {
-        @Override
-        public T translate(RestServer.HttpResponse<T> response) {
-            return response.body();
-        }
-    }
-
-    private class CreatedConnectorInfoTranslator implements Translator<Herder.Created<ConnectorInfo>, ConnectorInfo> {
-        @Override
-        public Herder.Created<ConnectorInfo> translate(RestServer.HttpResponse<ConnectorInfo> response) {
-            boolean created = response.status() == 201;
-            return new Herder.Created<>(created, response.body());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/RootResource.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/RootResource.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/RootResource.java
deleted file mode 100644
index d012c5b..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/RootResource.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.rest.resources;
-
-import org.apache.kafka.copycat.runtime.rest.entities.ServerInfo;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-
-@Path("/")
-@Produces(MediaType.APPLICATION_JSON)
-public class RootResource {
-
-    @GET
-    @Path("/")
-    public ServerInfo serverInfo() {
-        return new ServerInfo();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
deleted file mode 100644
index 6e547d3..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.standalone;
-
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.copycat.runtime.WorkerConfig;
-
-import java.util.Map;
-
-public class StandaloneConfig extends WorkerConfig {
-    private static final ConfigDef CONFIG;
-
-    static {
-        CONFIG = baseConfigDef();
-    }
-
-    public StandaloneConfig(Map<String, String> props) {
-        super(CONFIG, props);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
deleted file mode 100644
index 24a789a..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.standalone;
-
-import org.apache.kafka.copycat.errors.AlreadyExistsException;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.errors.NotFoundException;
-import org.apache.kafka.copycat.runtime.ConnectorConfig;
-import org.apache.kafka.copycat.runtime.Herder;
-import org.apache.kafka.copycat.runtime.HerderConnectorContext;
-import org.apache.kafka.copycat.runtime.TaskConfig;
-import org.apache.kafka.copycat.runtime.Worker;
-import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
-import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
-import org.apache.kafka.copycat.util.Callback;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * Single process, in-memory "herder". Useful for a standalone copycat process.
- */
-public class StandaloneHerder implements Herder {
-    private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
-
-    private final Worker worker;
-    private HashMap<String, ConnectorState> connectors = new HashMap<>();
-
-    public StandaloneHerder(Worker worker) {
-        this.worker = worker;
-    }
-
-    public synchronized void start() {
-        log.info("Herder starting");
-        log.info("Herder started");
-    }
-
-    public synchronized void stop() {
-        log.info("Herder stopping");
-
-        // There's no coordination/hand-off to do here since this is all standalone. Instead, we
-        // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
-        // the tasks.
-        for (String connName : new HashSet<>(connectors.keySet())) {
-            removeConnectorTasks(connName);
-            try {
-                worker.stopConnector(connName);
-            } catch (CopycatException e) {
-                log.error("Error shutting down connector {}: ", connName, e);
-            }
-        }
-        connectors.clear();
-
-        log.info("Herder stopped");
-    }
-
-    @Override
-    public synchronized void connectors(Callback<Collection<String>> callback) {
-        callback.onCompletion(null, new ArrayList<>(connectors.keySet()));
-    }
-
-    @Override
-    public synchronized void connectorInfo(String connName, Callback<ConnectorInfo> callback) {
-        ConnectorState state = connectors.get(connName);
-        if (state == null) {
-            callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
-            return;
-        }
-        callback.onCompletion(null, createConnectorInfo(state));
-    }
-
-    private ConnectorInfo createConnectorInfo(ConnectorState state) {
-        if (state == null)
-            return null;
-
-        List<ConnectorTaskId> taskIds = new ArrayList<>();
-        for (int i = 0; i < state.taskConfigs.size(); i++)
-            taskIds.add(new ConnectorTaskId(state.name, i));
-        return new ConnectorInfo(state.name, state.configOriginals, taskIds);
-    }
-
-    @Override
-    public void connectorConfig(String connName, final Callback<Map<String, String>> callback) {
-        // Subset of connectorInfo, so piggy back on that implementation
-        connectorInfo(connName, new Callback<ConnectorInfo>() {
-            @Override
-            public void onCompletion(Throwable error, ConnectorInfo result) {
-                if (error != null) {
-                    callback.onCompletion(error, null);
-                    return;
-                }
-                callback.onCompletion(null, result.config());
-            }
-        });
-    }
-
-    @Override
-    public synchronized void putConnectorConfig(String connName, final Map<String, String> config,
-                                                boolean allowReplace,
-                                                final Callback<Created<ConnectorInfo>> callback) {
-        try {
-            boolean created = false;
-            if (connectors.containsKey(connName)) {
-                if (!allowReplace) {
-                    callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null);
-                    return;
-                }
-                if (config == null) // Deletion, kill tasks as well
-                    removeConnectorTasks(connName);
-                worker.stopConnector(connName);
-                if (config == null)
-                    connectors.remove(connName);
-            } else {
-                if (config == null) {
-                    // Deletion, must already exist
-                    callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
-                    return;
-                }
-                created = true;
-            }
-            if (config != null) {
-                startConnector(config);
-                updateConnectorTasks(connName);
-            }
-            if (config != null)
-                callback.onCompletion(null, new Created<>(created, createConnectorInfo(connectors.get(connName))));
-            else
-                callback.onCompletion(null, new Created<ConnectorInfo>(false, null));
-        } catch (CopycatException e) {
-            callback.onCompletion(e, null);
-        }
-
-    }
-
-    @Override
-    public synchronized void requestTaskReconfiguration(String connName) {
-        if (!worker.connectorNames().contains(connName)) {
-            log.error("Task that requested reconfiguration does not exist: {}", connName);
-            return;
-        }
-        updateConnectorTasks(connName);
-    }
-
-    @Override
-    public synchronized void taskConfigs(String connName, Callback<List<TaskInfo>> callback) {
-        ConnectorState state = connectors.get(connName);
-        if (state == null) {
-            callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
-            return;
-        }
-
-        List<TaskInfo> result = new ArrayList<>();
-        for (int i = 0; i < state.taskConfigs.size(); i++) {
-            TaskInfo info = new TaskInfo(new ConnectorTaskId(connName, i), state.taskConfigs.get(i));
-            result.add(info);
-        }
-        callback.onCompletion(null, result);
-    }
-
-    @Override
-    public void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback) {
-        throw new UnsupportedOperationException("Copycat in standalone mode does not support externally setting task configurations.");
-    }
-
-    /**
-     * Start a connector in the worker and record its state.
-     * @param connectorProps new connector configuration
-     * @return the connector name
-     */
-    private String startConnector(Map<String, String> connectorProps) {
-        ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
-        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
-        ConnectorState state = connectors.get(connName);
-        worker.addConnector(connConfig, new HerderConnectorContext(this, connName));
-        if (state == null) {
-            connectors.put(connName, new ConnectorState(connectorProps, connConfig));
-        } else {
-            state.configOriginals = connectorProps;
-            state.config = connConfig;
-        }
-        return connName;
-    }
-
-
-    private List<Map<String, String>> recomputeTaskConfigs(String connName) {
-        ConnectorState state = connectors.get(connName);
-        return worker.connectorTaskConfigs(connName,
-                state.config.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
-                state.config.getList(ConnectorConfig.TOPICS_CONFIG));
-    }
-
-    private void createConnectorTasks(String connName) {
-        ConnectorState state = connectors.get(connName);
-        int index = 0;
-        for (Map<String, String> taskConfigMap : state.taskConfigs) {
-            ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
-            TaskConfig config = new TaskConfig(taskConfigMap);
-            try {
-                worker.addTask(taskId, config);
-            } catch (Throwable e) {
-                log.error("Failed to add task {}: ", taskId, e);
-                // Swallow this so we can continue updating the rest of the tasks
-                // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task
-                // that died after starting successfully.
-            }
-            index++;
-        }
-    }
-
-    private void removeConnectorTasks(String connName) {
-        ConnectorState state = connectors.get(connName);
-        for (int i = 0; i < state.taskConfigs.size(); i++) {
-            ConnectorTaskId taskId = new ConnectorTaskId(connName, i);
-            try {
-                worker.stopTask(taskId);
-            } catch (CopycatException e) {
-                log.error("Failed to stop task {}: ", taskId, e);
-                // Swallow this so we can continue stopping the rest of the tasks
-                // FIXME: Forcibly kill the task?
-            }
-        }
-        state.taskConfigs = new ArrayList<>();
-    }
-
-    private void updateConnectorTasks(String connName) {
-        List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
-        ConnectorState state = connectors.get(connName);
-        if (!newTaskConfigs.equals(state.taskConfigs)) {
-            removeConnectorTasks(connName);
-            state.taskConfigs = newTaskConfigs;
-            createConnectorTasks(connName);
-        }
-    }
-
-
-    private static class ConnectorState {
-        public String name;
-        public Map<String, String> configOriginals;
-        public ConnectorConfig config;
-        List<Map<String, String>> taskConfigs;
-
-        public ConnectorState(Map<String, String> configOriginals, ConnectorConfig config) {
-            this.name = config.getString(ConnectorConfig.NAME_CONFIG);
-            this.configOriginals = configOriginals;
-            this.config = config;
-            this.taskConfigs = new ArrayList<>();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java
deleted file mode 100644
index f707fd6..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Implementation of OffsetBackingStore that saves data locally to a file. To ensure this behaves
- * similarly to a real backing store, operations are executed asynchronously on a background thread.
- */
-public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
-    private static final Logger log = LoggerFactory.getLogger(FileOffsetBackingStore.class);
-
-    public final static String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename";
-    private File file;
-
-    public FileOffsetBackingStore() {
-
-    }
-
-    @Override
-    public void configure(Map<String, ?> props) {
-        super.configure(props);
-        String filename = (String) props.get(OFFSET_STORAGE_FILE_FILENAME_CONFIG);
-        file = new File(filename);
-    }
-
-    @Override
-    public synchronized void start() {
-        super.start();
-        log.info("Starting FileOffsetBackingStore with file {}", file);
-        load();
-    }
-
-    @Override
-    public synchronized void stop() {
-        super.stop();
-        // Nothing to do since this doesn't maintain any outstanding connections/data
-        log.info("Stopped FileOffsetBackingStore");
-    }
-
-    @SuppressWarnings("unchecked")
-    private void load() {
-        try {
-            ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
-            Object obj = is.readObject();
-            if (!(obj instanceof HashMap))
-                throw new CopycatException("Expected HashMap but found " + obj.getClass());
-            Map<byte[], byte[]> raw = (Map<byte[], byte[]>) obj;
-            data = new HashMap<>();
-            for (Map.Entry<byte[], byte[]> mapEntry : raw.entrySet()) {
-                ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null;
-                ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null;
-                data.put(key, value);
-            }
-            is.close();
-        } catch (FileNotFoundException | EOFException e) {
-            // FileNotFoundException: Ignore, may be new.
-            // EOFException: Ignore, this means the file was missing or corrupt
-        } catch (IOException | ClassNotFoundException e) {
-            throw new CopycatException(e);
-        }
-    }
-
-    protected void save() {
-        try {
-            ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file));
-            Map<byte[], byte[]> raw = new HashMap<>();
-            for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : data.entrySet()) {
-                byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;
-                byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null;
-                raw.put(key, value);
-            }
-            os.writeObject(raw);
-            os.close();
-        } catch (IOException e) {
-            throw new CopycatException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
deleted file mode 100644
index fb4f70d..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
+++ /dev/null
@@ -1,578 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.apache.kafka.copycat.data.SchemaBuilder;
-import org.apache.kafka.copycat.data.Struct;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.errors.DataException;
-import org.apache.kafka.copycat.runtime.distributed.ClusterConfigState;
-import org.apache.kafka.copycat.util.Callback;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.apache.kafka.copycat.util.KafkaBasedLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * <p>
- * Provides persistent storage of Copycat connector configurations in a Kafka topic.
- * </p>
- * <p>
- * This class manages both connector and task configurations. It tracks three types of configuration entries:
- * <p/>
- * 1. Connector config: map of string -> string configurations passed to the Connector class, with support for
- * expanding this format if necessary. (Kafka key: connector-[connector-id]).
- * These configs are *not* ephemeral. They represent the source of truth. If the entire Copycat
- * cluster goes down, this is all that is really needed to recover.
- * 2. Task configs: map of string -> string configurations passed to the Task class, with support for expanding
- * this format if necessary. (Kafka key: task-[connector-id]-[task-id]).
- * These configs are ephemeral; they are stored here to a) disseminate them to all workers while
- * ensuring agreement and b) to allow faster cluster/worker recovery since the common case
- * of recovery (restoring a connector) will simply result in the same configuration as before
- * the failure.
- * 3. Task commit "configs": records indicating that previous task config entries should be committed and all task
- * configs for a connector can be applied. (Kafka key: commit-[connector-id].
- * This config has two effects. First, it records the number of tasks the connector is currently
- * running (and can therefore increase/decrease parallelism). Second, because each task config
- * is stored separately but they need to be applied together to ensure each partition is assigned
- * to a single task, this record also indicates that task configs for the specified connector
- * can be "applied" or "committed".
- * </p>
- * <p>
- * This configuration is expected to be stored in a *single partition* and *compacted* topic. Using a single partition
- * ensures we can enforce ordering on messages, allowing Kafka to be used as a write ahead log. Compaction allows
- * us to clean up outdated configurations over time. However, this combination has some important implications for
- * the implementation of this class and the configuration state that it may expose.
- * </p>
- * <p>
- * Connector configurations are independent of all other configs, so they are handled easily. Writing a single record
- * is already atomic, so these can be applied as soon as they are read. One connectors config does not affect any
- * others, and they do not need to coordinate with the connector's task configuration at all.
- * </p>
- * <p>
- * The most obvious implication for task configs is the need for the commit messages. Because Kafka does not
- * currently have multi-record transactions or support atomic batch record writes, task commit messages are required
- * to ensure that readers do not end up using inconsistent configs. For example, consider if a connector wrote configs
- * for its tasks, then was reconfigured and only managed to write updated configs for half its tasks. If task configs
- * were applied immediately you could be using half the old configs and half the new configs. In that condition, some
- * partitions may be double-assigned because the old config and new config may use completely different assignments.
- * Therefore, when reading the log, we must buffer config updates for a connector's tasks and only apply atomically them
- * once a commit message has been read.
- * </p>
- * <p>
- * However, there are also further challenges. This simple buffering approach would work fine as long as the entire log was
- * always available, but we would like to be able to enable compaction so our configuration topic does not grow
- * indefinitely. Compaction may break a normal log because old entries will suddenly go missing. A new worker reading
- * from the beginning of the log in order to build up the full current configuration will see task commits, but some
- * records required for those commits will have been removed because the same keys have subsequently been rewritten.
- * For example, if you have a sequence of record keys [connector-foo-config, task-foo-1-config, task-foo-2-config,
- * commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)], we can end up with a compacted log containing
- * [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)]. When read
- * back, the first commit will see an invalid state because the first task-foo-1-config has been cleaned up.
- * </p>
- * <p>
- * Compaction can further complicate things if writing new task configs fails mid-write. Consider a similar scenario
- * as the previous one, but in this case both the first and second update will write 2 task configs. However, the
- * second write fails half of the way through:
- * [connector-foo-config, task-foo-1-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. Now compaction
- * occurs and we're left with
- * [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. At the first commit, we don't
- * have a complete set of configs. And because of the failure, there is no second commit. We are left in an inconsistent
- * state with no obvious way to resolve the issue -- we can try to keep on reading, but the failed node may never
- * recover and write the updated config. Meanwhile, other workers may have seen the entire log; they will see the second
- * task-foo-1-config waiting to be applied, but will otherwise think everything is ok -- they have a valid set of task
- * configs for connector "foo".
- * </p>
- * <p>
- * Because we can encounter these inconsistencies and addressing them requires support from the rest of the system
- * (resolving the task configuration inconsistencies requires support from the connector instance to regenerate updated
- * configs), this class exposes not only the current set of configs, but also which connectors have inconsistent data.
- * This allows users of this class (i.e., Herder implementations) to take action to resolve any inconsistencies. These
- * inconsistencies should be rare (as described above, due to compaction combined with leader failures in the middle
- * of updating task configurations).
- * </p>
- * <p>
- * Note that the expectation is that this config storage system has only a single writer at a time.
- * The caller (Herder) must ensure this is the case. In distributed mode this will require forwarding config change
- * requests to the leader in the cluster (i.e. the worker group coordinated by the Kafka broker).
- * </p>
- * <p>
- * Since processing of the config log occurs in a background thread, callers must take care when using accessors.
- * To simplify handling this correctly, this class only exposes a mechanism to snapshot the current state of the cluster.
- * Updates may continue to be applied (and callbacks invoked) in the background. Callers must take care that they are
- * using a consistent snapshot and only update when it is safe. In particular, if task configs are updated which require
- * synchronization across workers to commit offsets and update the configuration, callbacks and updates during the
- * rebalance must be deferred.
- * </p>
- */
-public class KafkaConfigStorage {
-    private static final Logger log = LoggerFactory.getLogger(KafkaConfigStorage.class);
-
-    public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic";
-
-    public static final String CONNECTOR_PREFIX = "connector-";
-
-    public static String CONNECTOR_KEY(String connectorName) {
-        return CONNECTOR_PREFIX + connectorName;
-    }
-
-    public static final String TASK_PREFIX = "task-";
-
-    public static String TASK_KEY(ConnectorTaskId taskId) {
-        return TASK_PREFIX + taskId.connector() + "-" + taskId.task();
-    }
-
-    public static final String COMMIT_TASKS_PREFIX = "commit-";
-
-    public static String COMMIT_TASKS_KEY(String connectorName) {
-        return COMMIT_TASKS_PREFIX + connectorName;
-    }
-
-    // Note that while using real serialization for values as we have here, but ad hoc string serialization for keys,
-    // isn't ideal, we use this approach because it avoids any potential problems with schema evolution or
-    // converter/serializer changes causing keys to change. We need to absolutely ensure that the keys remain precisely
-    // the same.
-    public static final Schema CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct()
-            .field("properties", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA))
-            .build();
-    public static final Schema TASK_CONFIGURATION_V0 = CONNECTOR_CONFIGURATION_V0;
-    public static final Schema CONNECTOR_TASKS_COMMIT_V0 = SchemaBuilder.struct()
-            .field("tasks", Schema.INT32_SCHEMA)
-            .build();
-
-    private static final long READ_TO_END_TIMEOUT_MS = 30000;
-
-    private final Object lock;
-    private boolean starting;
-    private final Converter converter;
-    private final Callback<String> connectorConfigCallback;
-    private final Callback<List<ConnectorTaskId>> tasksConfigCallback;
-    private String topic;
-    // Data is passed to the log already serialized. We use a converter to handle translating to/from generic Copycat
-    // format to serialized form
-    private KafkaBasedLog<String, byte[]> configLog;
-    // Connector -> # of tasks
-    private Map<String, Integer> connectorTaskCounts = new HashMap<>();
-    // Connector and task configs: name or id -> config map
-    private Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
-    private Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
-    // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
-    // is in an inconsistent state and we cannot safely use them until they have been refreshed.
-    private Set<String> inconsistent = new HashSet<>();
-    // The most recently read offset. This does not take into account deferred task updates/commits, so we may have
-    // outstanding data to be applied.
-    private long offset;
-
-    // Connector -> Map[ConnectorTaskId -> Configs]
-    private Map<String, Map<ConnectorTaskId, Map<String, String>>> deferredTaskUpdates = new HashMap<>();
-
-
-    public KafkaConfigStorage(Converter converter, Callback<String> connectorConfigCallback, Callback<List<ConnectorTaskId>> tasksConfigCallback) {
-        this.lock = new Object();
-        this.starting = false;
-        this.converter = converter;
-        this.connectorConfigCallback = connectorConfigCallback;
-        this.tasksConfigCallback = tasksConfigCallback;
-
-        offset = -1;
-    }
-
-    public void configure(Map<String, ?> configs) {
-        if (configs.get(CONFIG_TOPIC_CONFIG) == null)
-            throw new CopycatException("Must specify topic for Copycat connector configuration.");
-        topic = (String) configs.get(CONFIG_TOPIC_CONFIG);
-
-        Map<String, Object> producerProps = new HashMap<>();
-        producerProps.putAll(configs);
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
-
-        Map<String, Object> consumerProps = new HashMap<>();
-        consumerProps.putAll(configs);
-        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
-        configLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
-    }
-
-    public void start() {
-        log.info("Starting KafkaConfigStorage");
-        // During startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
-        // updates can continue to occur in the background
-        starting = true;
-        configLog.start();
-        starting = false;
-        log.info("Started KafkaConfigStorage");
-    }
-
-    public void stop() {
-        log.info("Closing KafkaConfigStorage");
-        configLog.stop();
-        log.info("Closed KafkaConfigStorage");
-    }
-
-    /**
-     * Get a snapshot of the current state of the cluster.
-     */
-    public ClusterConfigState snapshot() {
-        synchronized (lock) {
-            // Doing a shallow copy of the data is safe here because the complex nested data that is copied should all be
-            // immutable configs
-            return new ClusterConfigState(
-                    offset,
-                    new HashMap<>(connectorTaskCounts),
-                    new HashMap<>(connectorConfigs),
-                    new HashMap<>(taskConfigs),
-                    new HashSet<>(inconsistent)
-            );
-        }
-    }
-
-    /**
-     * Write this connector configuration to persistent storage and wait until it has been acknowledge and read back by
-     * tailing the Kafka log with a consumer.
-     *
-     * @param connector  name of the connector to write data for
-     * @param properties the configuration to write
-     */
-    public void putConnectorConfig(String connector, Map<String, String> properties) {
-        byte[] serializedConfig;
-        if (properties == null) {
-            serializedConfig = null;
-        } else {
-            Struct copycatConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
-            copycatConfig.put("properties", properties);
-            serializedConfig = converter.fromCopycatData(topic, CONNECTOR_CONFIGURATION_V0, copycatConfig);
-        }
-
-        try {
-            configLog.send(CONNECTOR_KEY(connector), serializedConfig);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            log.error("Failed to write connector configuration to Kafka: ", e);
-            throw new CopycatException("Error writing connector configuration to Kafka", e);
-        }
-    }
-
-    /**
-     * Write these task configurations and associated commit messages, unless an inconsistency is found that indicates
-     * that we would be leaving one of the referenced connectors with an inconsistent state.
-     *
-     * @param configs map containing task configurations
-     * @throws CopycatException if the task configurations do not resolve inconsistencies found in the existing root
-     *                          and task configurations.
-     */
-    public void putTaskConfigs(Map<ConnectorTaskId, Map<String, String>> configs) {
-        // Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have
-        // any outstanding lagging data to consume.
-        try {
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            log.error("Failed to write root configuration to Kafka: ", e);
-            throw new CopycatException("Error writing root configuration to Kafka", e);
-        }
-
-        // In theory, there is only a single writer and we shouldn't need this lock since the background thread should
-        // not invoke any callbacks that would conflict, but in practice this guards against inconsistencies due to
-        // the root config being updated.
-        Map<String, Integer> newTaskCounts = new HashMap<>();
-        synchronized (lock) {
-            // Validate tasks in this assignment. Any task configuration updates should include updates for *all* tasks
-            // in the connector -- we should have all task IDs 0 - N-1 within a connector if any task is included here
-            Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(configs);
-            for (Map.Entry<String, Set<Integer>> taskConfigSetEntry : updatedConfigIdsByConnector.entrySet()) {
-                if (!completeTaskIdSet(taskConfigSetEntry.getValue(), taskConfigSetEntry.getValue().size())) {
-                    log.error("Submitted task configuration contain invalid range of task IDs, ignoring this submission");
-                    throw new CopycatException("Error writing task configurations: found some connectors with invalid connectors");
-                }
-                newTaskCounts.put(taskConfigSetEntry.getKey(), taskConfigSetEntry.getValue().size());
-            }
-        }
-
-        // Start sending all the individual updates
-        for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : configs.entrySet()) {
-            Struct copycatConfig = new Struct(TASK_CONFIGURATION_V0);
-            copycatConfig.put("properties", taskConfigEntry.getValue());
-            byte[] serializedConfig = converter.fromCopycatData(topic, TASK_CONFIGURATION_V0, copycatConfig);
-            configLog.send(TASK_KEY(taskConfigEntry.getKey()), serializedConfig);
-        }
-
-        // Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to
-        // the end of the log
-        try {
-            // Read to end to ensure all the task configs have been written
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-
-            // Write all the commit messages
-            for (Map.Entry<String, Integer> taskCountEntry : newTaskCounts.entrySet()) {
-                Struct copycatConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
-                copycatConfig.put("tasks", taskCountEntry.getValue());
-                byte[] serializedConfig = converter.fromCopycatData(topic, CONNECTOR_TASKS_COMMIT_V0, copycatConfig);
-                configLog.send(COMMIT_TASKS_KEY(taskCountEntry.getKey()), serializedConfig);
-            }
-
-            // Read to end to ensure all the commit messages have been written
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            log.error("Failed to write root configuration to Kafka: ", e);
-            throw new CopycatException("Error writing root configuration to Kafka", e);
-        }
-    }
-
-    public Future<Void> readToEnd() {
-        return configLog.readToEnd();
-    }
-
-    public void readToEnd(Callback<Void> cb) {
-        configLog.readToEnd(cb);
-    }
-
-    private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
-                                                              Map<String, Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> consumedCallback) {
-        return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
-    }
-
-    private final Callback<ConsumerRecord<String, byte[]>> consumedCallback = new Callback<ConsumerRecord<String, byte[]>>() {
-        @Override
-        public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
-            if (error != null) {
-                log.error("Unexpected in consumer callback for KafkaConfigStorage: ", error);
-                return;
-            }
-
-            final SchemaAndValue value;
-            try {
-                value = converter.toCopycatData(topic, record.value());
-            } catch (DataException e) {
-                log.error("Failed to convert config data to Copycat format: ", e);
-                return;
-            }
-            // Make the recorded offset match the API used for positions in the consumer -- return the offset of the
-            // *next record*, not the last one consumed.
-            offset = record.offset() + 1;
-
-            if (record.key().startsWith(CONNECTOR_PREFIX)) {
-                String connectorName = record.key().substring(CONNECTOR_PREFIX.length());
-                synchronized (lock) {
-                    if (value.value() == null) {
-                        // Connector deletion will be written as a null value
-                        connectorConfigs.remove(connectorName);
-                    } else {
-                        // Connector configs can be applied and callbacks invoked immediately
-                        if (!(value.value() instanceof Map)) {
-                            log.error("Found connector configuration (" + record.key() + ") in wrong format: " + value.value().getClass());
-                            return;
-                        }
-                        Object newConnectorConfig = ((Map<String, Object>) value.value()).get("properties");
-                        if (!(newConnectorConfig instanceof Map)) {
-                            log.error("Invalid data for connector config: properties filed should be a Map but is " + newConnectorConfig.getClass());
-                            return;
-                        }
-                        connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig);
-                    }
-                }
-                if (!starting)
-                    connectorConfigCallback.onCompletion(null, connectorName);
-            } else if (record.key().startsWith(TASK_PREFIX)) {
-                synchronized (lock) {
-                    ConnectorTaskId taskId = parseTaskId(record.key());
-                    if (taskId == null) {
-                        log.error("Ignoring task configuration because " + record.key() + " couldn't be parsed as a task config key");
-                        return;
-                    }
-                    if (!(value.value() instanceof Map)) {
-                        log.error("Ignoring task configuration because it is in the wrong format: " + value.value());
-                        return;
-                    }
-
-                    Object newTaskConfig = ((Map<String, Object>) value.value()).get("properties");
-                    if (!(newTaskConfig instanceof Map)) {
-                        log.error("Invalid data for task config: properties filed should be a Map but is " + newTaskConfig.getClass());
-                        return;
-                    }
-
-                    Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(taskId.connector());
-                    if (deferred == null) {
-                        deferred = new HashMap<>();
-                        deferredTaskUpdates.put(taskId.connector(), deferred);
-                    }
-                    deferred.put(taskId, (Map<String, String>) newTaskConfig);
-                }
-            } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
-                String connectorName = record.key().substring(COMMIT_TASKS_PREFIX.length());
-                List<ConnectorTaskId> updatedTasks = new ArrayList<>();
-                synchronized (lock) {
-                    // Apply any outstanding deferred task updates for the given connector. Note that just because we
-                    // encounter a commit message does not mean it will result in consistent output. In particular due to
-                    // compaction, there may be cases where . For example if we have the following sequence of writes:
-                    //
-                    // 1. Write connector "foo"'s config
-                    // 2. Write connector "foo", task 1's config <-- compacted
-                    // 3. Write connector "foo", task 2's config
-                    // 4. Write connector "foo" task commit message
-                    // 5. Write connector "foo", task 1's config
-                    // 6. Write connector "foo", task 2's config
-                    // 7. Write connector "foo" task commit message
-                    //
-                    // then when a new worker starts up, if message 2 had been compacted, then when message 4 is applied
-                    // "foo" will not have a complete set of configs. Only when message 7 is applied will the complete
-                    // configuration be available. Worse, if the leader died while writing messages 5, 6, and 7 such that
-                    // only 5 was written, then there may be nothing that will finish writing the configs and get the
-                    // log back into a consistent state.
-                    //
-                    // It is expected that the user of this class (i.e., the Herder) will take the necessary action to
-                    // resolve this (i.e., get the connector to recommit its configuration). This inconsistent state is
-                    // exposed in the snapshots provided via ClusterConfigState so they are easy to handle.
-                    if (!(value.value() instanceof Map)) { // Schema-less, so we get maps instead of structs
-                        log.error("Ignoring connector tasks configuration commit because it is in the wrong format: " + value.value());
-                        return;
-                    }
-
-                    Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);
-
-                    int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));
-
-                    // Validate the configs we're supposed to update to ensure we're getting a complete configuration
-                    // update of all tasks that are expected based on the number of tasks in the commit message.
-                    Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(deferred);
-                    Set<Integer> taskIdSet = updatedConfigIdsByConnector.get(connectorName);
-                    if (!completeTaskIdSet(taskIdSet, newTaskCount)) {
-                        // Given the logic for writing commit messages, we should only hit this condition due to compacted
-                        // historical data, in which case we would not have applied any updates yet and there will be no
-                        // task config data already committed for the connector, so we shouldn't have to clear any data
-                        // out. All we need to do is add the flag marking it inconsistent.
-                        inconsistent.add(connectorName);
-                    } else {
-                        if (deferred != null) {
-                            taskConfigs.putAll(deferred);
-                            updatedTasks.addAll(taskConfigs.keySet());
-                        }
-                        inconsistent.remove(connectorName);
-                    }
-                    // Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent
-                    // update, then we need to see a completely fresh set of configs after this commit message, so we don't
-                    // want any of these outdated configs
-                    if (deferred != null)
-                        deferred.clear();
-
-                    connectorTaskCounts.put(connectorName, newTaskCount);
-                }
-
-                if (!starting)
-                    tasksConfigCallback.onCompletion(null, updatedTasks);
-            } else {
-                log.error("Discarding config update record with invalid key: " + record.key());
-            }
-        }
-    };
-
-    private ConnectorTaskId parseTaskId(String key) {
-        String[] parts = key.split("-");
-        if (parts.length < 3) return null;
-
-        try {
-            int taskNum = Integer.parseInt(parts[parts.length - 1]);
-            String connectorName = Utils.join(Arrays.copyOfRange(parts, 1, parts.length - 1), "-");
-            return new ConnectorTaskId(connectorName, taskNum);
-        } catch (NumberFormatException e) {
-            return null;
-        }
-    }
-
-    /**
-     * Given task configurations, get a set of integer task IDs organized by connector name.
-     */
-    private Map<String, Set<Integer>> taskIdsByConnector(Map<ConnectorTaskId, Map<String, String>> configs) {
-        Map<String, Set<Integer>> connectorTaskIds = new HashMap<>();
-        if (configs == null)
-            return connectorTaskIds;
-        for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : configs.entrySet()) {
-            ConnectorTaskId taskId = taskConfigEntry.getKey();
-            if (!connectorTaskIds.containsKey(taskId.connector()))
-                connectorTaskIds.put(taskId.connector(), new TreeSet<Integer>());
-            connectorTaskIds.get(taskId.connector()).add(taskId.task());
-        }
-        return connectorTaskIds;
-    }
-
-    private boolean completeTaskIdSet(Set<Integer> idSet, int expectedSize) {
-        // Note that we do *not* check for the exact set. This is an important implication of compaction. If we start out
-        // with 2 tasks, then reduce to 1, we'll end up with log entries like:
-        //
-        // 1. Connector "foo" config
-        // 2. Connector "foo", task 1 config
-        // 3. Connector "foo", task 2 config
-        // 4. Connector "foo", commit 2 tasks
-        // 5. Connector "foo", task 1 config
-        // 6. Connector "foo", commit 1 tasks
-        //
-        // However, due to compaction we could end up with a log that looks like this:
-        //
-        // 1. Connector "foo" config
-        // 3. Connector "foo", task 2 config
-        // 5. Connector "foo", task 1 config
-        // 6. Connector "foo", commit 1 tasks
-        //
-        // which isn't incorrect, but would appear in this code to have an extra task configuration. Instead, we just
-        // validate that all the configs specified by the commit message are present. This should be fine because the
-        // logic for writing configs ensures all the task configs are written (and reads them back) before writing the
-        // commit message.
-
-        if (idSet.size() < expectedSize)
-            return false;
-
-        for (int i = 0; i < expectedSize; i++)
-            if (!idSet.contains(i))
-                return false;
-        return true;
-    }
-
-    // Convert an integer value extracted from a schemaless struct to an int. This handles potentially different
-    // encodings by different Converters.
-    private static int intValue(Object value) {
-        if (value instanceof Integer)
-            return (int) value;
-        else if (value instanceof Long)
-            return (int) (long) value;
-        else
-            throw new CopycatException("Expected integer value to be either Integer or Long");
-    }
-}
-