You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/09 07:11:40 UTC
[21/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/errors/AlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/errors/AlreadyExistsException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/AlreadyExistsException.java
new file mode 100644
index 0000000..6fdefdf
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/AlreadyExistsException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.errors;
+
+/**
+ * Indicates the operation tried to create an entity that already exists.
+ */
+public class AlreadyExistsException extends ConnectException {
+ public AlreadyExistsException(String s) {
+ super(s);
+ }
+
+ public AlreadyExistsException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public AlreadyExistsException(Throwable throwable) {
+ super(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/errors/NotFoundException.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/errors/NotFoundException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/NotFoundException.java
new file mode 100644
index 0000000..a3bbe91
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/NotFoundException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.errors;
+
+/**
+ * Indicates that an operation attempted to modify or delete a connector or task that is not present on the worker.
+ */
+public class NotFoundException extends ConnectException {
+ public NotFoundException(String s) {
+ super(s);
+ }
+
+ public NotFoundException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public NotFoundException(Throwable throwable) {
+ super(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/errors/RetriableException.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/errors/RetriableException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/RetriableException.java
new file mode 100644
index 0000000..1b5b07a
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/RetriableException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.errors;
+
+/**
+ * An exception that indicates the operation can be reattempted.
+ */
+public class RetriableException extends ConnectException {
+ public RetriableException(String s) {
+ super(s);
+ }
+
+ public RetriableException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public RetriableException(Throwable throwable) {
+ super(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
new file mode 100644
index 0000000..6611e5d
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class ties together all the components of a Kafka Connect process (herder, worker,
+ * storage, command interface), managing their lifecycle.
+ */
+@InterfaceStability.Unstable
+public class Connect {
+ private static final Logger log = LoggerFactory.getLogger(Connect.class);
+
+ private final Worker worker;
+ private final Herder herder;
+ private final RestServer rest;
+ private final CountDownLatch startLatch = new CountDownLatch(1);
+ private final CountDownLatch stopLatch = new CountDownLatch(1);
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
+ private final ShutdownHook shutdownHook;
+
+ public Connect(Worker worker, Herder herder, RestServer rest) {
+ log.debug("Kafka Connect instance created");
+ this.worker = worker;
+ this.herder = herder;
+ this.rest = rest;
+ shutdownHook = new ShutdownHook();
+ }
+
+ public void start() {
+ log.info("Kafka Connect starting");
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+ worker.start();
+ herder.start();
+ rest.start(herder);
+
+ log.info("Kafka Connect started");
+
+ startLatch.countDown();
+ }
+
+ public void stop() {
+ boolean wasShuttingDown = shutdown.getAndSet(true);
+ if (!wasShuttingDown) {
+ log.info("Kafka Connect stopping");
+
+ rest.stop();
+ herder.stop();
+ worker.stop();
+
+ log.info("Kafka Connect stopped");
+ }
+
+ stopLatch.countDown();
+ }
+
+ public void awaitStop() {
+ try {
+ stopLatch.await();
+ } catch (InterruptedException e) {
+ log.error("Interrupted waiting for Kafka Connect to shutdown");
+ }
+ }
+
+ private class ShutdownHook extends Thread {
+ @Override
+ public void run() {
+ try {
+ startLatch.await();
+ Connect.this.stop();
+ } catch (InterruptedException e) {
+ log.error("Interrupted in shutdown hook while waiting for Kafka Connect startup to finish");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
new file mode 100644
index 0000000..77cfc8d
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <p>
+ * Configuration options for Connectors. These only include Kafka Connect system-level configuration
+ * options (e.g. Connector class name, timeouts used by Connect to control the connector) but does
+ * not include Connector-specific options (e.g. database connection settings).
+ * </p>
+ * <p>
+ * Note that some of these options are not required for all connectors. For example TOPICS_CONFIG
+ * is sink-specific.
+ * </p>
+ */
+public class ConnectorConfig extends AbstractConfig {
+
+ public static final String NAME_CONFIG = "name";
+ private static final String NAME_DOC = "Globally unique name to use for this connector.";
+
+ public static final String CONNECTOR_CLASS_CONFIG = "connector.class";
+ private static final String CONNECTOR_CLASS_DOC =
+ "Name of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector";
+
+ public static final String TASKS_MAX_CONFIG = "tasks.max";
+ private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
+ public static final int TASKS_MAX_DEFAULT = 1;
+
+ public static final String TOPICS_CONFIG = "topics";
+ private static final String TOPICS_DOC = "";
+ public static final String TOPICS_DEFAULT = "";
+
+ private static ConfigDef config;
+
+ static {
+ config = new ConfigDef()
+ .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC)
+ .define(CONNECTOR_CLASS_CONFIG, Type.CLASS, Importance.HIGH, CONNECTOR_CLASS_DOC)
+ .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC)
+ .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC);
+ }
+
+ public ConnectorConfig() {
+ this(new HashMap<String, String>());
+ }
+
+ public ConnectorConfig(Map<String, String> props) {
+ super(config, props);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
new file mode 100644
index 0000000..fc0689c
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.util.Callback;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * <p>
+ * The herder interface tracks and manages workers and connectors. It is the main interface for external components
+ * to make changes to the state of the cluster. For example, in distributed mode, an implementation of this class
+ * knows how to accept a connector configuration, may need to route it to the current leader worker for the cluster so
+ * the config can be written to persistent storage, and then ensures the new connector is correctly instantiated on one
+ * of the workers.
+ * </p>
+ * <p>
+ * This class must implement all the actions that can be taken on the cluster (add/remove connectors, pause/resume tasks,
+ * get state of connectors and tasks, etc). The non-Java interfaces to the cluster (REST API and CLI) are very simple
+ * wrappers of the functionality provided by this interface.
+ * </p>
+ * <p>
+ * In standalone mode, this implementation of this class will be trivial because no coordination is needed. In that case,
+ * the implementation will mainly be delegating tasks directly to other components. For example, when creating a new
+ * connector in standalone mode, there is no need to persist the config and the connector and its tasks must run in the
+ * same process, so the standalone herder implementation can immediately instantiate and start the connector and its
+ * tasks.
+ * </p>
+ */
+public interface Herder {
+
+ void start();
+
+ void stop();
+
+ /**
+ * Get a list of connectors currently running in this cluster. This is a full list of connectors in the cluster gathered
+ * from the current configuration. However, note
+ *
+ * @returns A list of connector names
+ * @throws org.apache.kafka.connect.runtime.distributed.NotLeaderException if this node can not resolve the request
+ * (e.g., because it has not joined the cluster or does not have configs in sync with the group) and it is
+ * also not the leader
+ * @throws ConnectException if this node is the leader, but still cannot resolve the
+ * request (e.g., it is not in sync with other worker's config state)
+ */
+ void connectors(Callback<Collection<String>> callback);
+
+ /**
+ * Get the definition and status of a connector.
+ */
+ void connectorInfo(String connName, Callback<ConnectorInfo> callback);
+
+ /**
+ * Get the configuration for a connector.
+ * @param connName name of the connector
+ * @param callback callback to invoke with the configuration
+ */
+ void connectorConfig(String connName, Callback<Map<String, String>> callback);
+
+ /**
+ * Set the configuration for a connector. This supports creation, update, and deletion.
+ * @param connName name of the connector
+ * @param config the connectors configuration, or null if deleting the connector
+ * @param allowReplace if true, allow overwriting previous configs; if false, throw AlreadyExistsException if a connector
+ * with the same name already exists
+ * @param callback callback to invoke when the configuration has been written
+ */
+ void putConnectorConfig(String connName, Map<String, String> config, boolean allowReplace, Callback<Created<ConnectorInfo>> callback);
+
+ /**
+ * Requests reconfiguration of the task. This should only be triggered by
+ * {@link HerderConnectorContext}.
+ *
+ * @param connName name of the connector that should be reconfigured
+ */
+ void requestTaskReconfiguration(String connName);
+
+ /**
+ * Get the configurations for the current set of tasks of a connector.
+ * @param connName connector to update
+ * @param callback callback to invoke upon completion
+ */
+ void taskConfigs(String connName, Callback<List<TaskInfo>> callback);
+
+ /**
+ * Set the configurations for the tasks of a connector. This should always include all tasks in the connector; if
+ * there are existing configurations and fewer are provided, this will reduce the number of tasks, and if more are
+ * provided it will increase the number of tasks.
+ * @param connName connector to update
+ * @param configs list of configurations
+ * @param callback callback to invoke upon completion
+ */
+ void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback);
+
+
+ class Created<T> {
+ private final boolean created;
+ private final T result;
+
+ public Created(boolean created, T result) {
+ this.created = created;
+ this.result = result;
+ }
+
+ public boolean created() {
+ return created;
+ }
+
+ public T result() {
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Created<?> created1 = (Created<?>) o;
+ return Objects.equals(created, created1.created) &&
+ Objects.equals(result, created1.result);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(created, result);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
new file mode 100644
index 0000000..070aa20
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.connector.ConnectorContext;
+
+/**
+ * ConnectorContext for use with the StandaloneHerder, which maintains all connectors and tasks
+ * in a single process.
+ */
+public class HerderConnectorContext implements ConnectorContext {
+
+ private Herder herder;
+ private String connectorName;
+
+ public HerderConnectorContext(Herder herder, String connectorName) {
+ this.herder = herder;
+ this.connectorName = connectorName;
+ }
+
+ @Override
+ public void requestTaskReconfiguration() {
+ // This is trivial to forward since there is only one herder and it's in memory in this
+ // process
+ herder.requestTaskReconfiguration(connectorName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
new file mode 100644
index 0000000..bee24e7
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>
+ * Manages offset commit scheduling and execution for SourceTasks.
+ * </p>
+ * <p>
+ * Unlike sink tasks which directly manage their offset commits in the main poll() thread since
+ * they drive the event loop and control (for all intents and purposes) the timeouts, source
+ * tasks are at the whim of the connector and cannot be guaranteed to wake up on the necessary
+ * schedule. Instead, this class tracks all the active tasks, their schedule for commits, and
+ * ensures they are invoked in a timely fashion.
+ * </p>
+ */
+class SourceTaskOffsetCommitter {
+ private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class);
+
+ private Time time;
+ private WorkerConfig config;
+ private ScheduledExecutorService commitExecutorService = null;
+ private HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>();
+
+ SourceTaskOffsetCommitter(Time time, WorkerConfig config) {
+ this.time = time;
+ this.config = config;
+ commitExecutorService = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ public void close(long timeoutMs) {
+ commitExecutorService.shutdown();
+ try {
+ if (!commitExecutorService.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
+ log.error("Graceful shutdown of offset commitOffsets thread timed out.");
+ }
+ } catch (InterruptedException e) {
+ // ignore and allow to exit immediately
+ }
+ }
+
+ public void schedule(final ConnectorTaskId id, final WorkerSourceTask workerTask) {
+ synchronized (committers) {
+ long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+ ScheduledFuture<?> commitFuture = commitExecutorService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ commit(id, workerTask);
+ }
+ }, commitIntervalMs, TimeUnit.MILLISECONDS);
+ committers.put(id, new ScheduledCommitTask(commitFuture));
+ }
+ }
+
+ public void remove(ConnectorTaskId id) {
+ final ScheduledCommitTask task;
+ synchronized (committers) {
+ task = committers.remove(id);
+ task.cancelled = true;
+ task.commitFuture.cancel(false);
+ }
+ if (task.finishedLatch != null) {
+ try {
+ task.finishedLatch.await();
+ } catch (InterruptedException e) {
+ throw new ConnectException("Unexpected interruption in SourceTaskOffsetCommitter.", e);
+ }
+ }
+ }
+
+ public void commit(ConnectorTaskId id, WorkerSourceTask workerTask) {
+ final ScheduledCommitTask task;
+ synchronized (committers) {
+ task = committers.get(id);
+ if (task == null || task.cancelled)
+ return;
+ task.finishedLatch = new CountDownLatch(1);
+ }
+
+ try {
+ log.debug("Committing offsets for {}", workerTask);
+ boolean success = workerTask.commitOffsets();
+ if (!success) {
+ log.error("Failed to commit offsets for {}", workerTask);
+ }
+ } catch (Throwable t) {
+ // We're very careful about exceptions here since any uncaught exceptions in the commit
+ // thread would cause the fixed interval schedule on the ExecutorService to stop running
+ // for that task
+ log.error("Unhandled exception when committing {}: ", workerTask, t);
+ } finally {
+ synchronized (committers) {
+ task.finishedLatch.countDown();
+ if (!task.cancelled)
+ schedule(id, workerTask);
+ }
+ }
+ }
+
+ private static class ScheduledCommitTask {
+ ScheduledFuture<?> commitFuture;
+ boolean cancelled;
+ CountDownLatch finishedLatch;
+
+ ScheduledCommitTask(ScheduledFuture<?> commitFuture) {
+ this.commitFuture = commitFuture;
+ this.cancelled = false;
+ this.finishedLatch = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskConfig.java
new file mode 100644
index 0000000..48cb4d8
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskConfig.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <p>
+ * Configuration options for Tasks. These only include Kafka Connect system-level configuration
+ * options.
+ * </p>
+ */
+public class TaskConfig extends AbstractConfig {
+
+ public static final String TASK_CLASS_CONFIG = "task.class";
+ private static final String TASK_CLASS_DOC =
+ "Name of the class for this task. Must be a subclass of org.apache.kafka.connect.connector.Task";
+
+ private static ConfigDef config;
+
+ static {
+ config = new ConfigDef()
+ .define(TASK_CLASS_CONFIG, Type.CLASS, Importance.HIGH, TASK_CLASS_DOC);
+ }
+
+ public TaskConfig() {
+ this(new HashMap<String, String>());
+ }
+
+ public TaskConfig(Map<String, ?> props) {
+ super(config, props);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
new file mode 100644
index 0000000..2e359d6
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.*;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * <p>
+ * Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving
+ * data to/from Kafka.
+ * </p>
+ * <p>
+ * Since each task has a dedicated thread, this is mainly just a container for them.
+ * </p>
+ */
+public class Worker {
+ private static final Logger log = LoggerFactory.getLogger(Worker.class);
+
+ private Time time;
+ private WorkerConfig config;
+ private Converter keyConverter;
+ private Converter valueConverter;
+ private Converter internalKeyConverter;
+ private Converter internalValueConverter;
+ private OffsetBackingStore offsetBackingStore;
+ private HashMap<String, Connector> connectors = new HashMap<>();
+ private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
+ private KafkaProducer<byte[], byte[]> producer;
+ private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
+
+ public Worker(WorkerConfig config, OffsetBackingStore offsetBackingStore) {
+ this(new SystemTime(), config, offsetBackingStore);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
+ this.time = time;
+ this.config = config;
+ this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
+ this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true);
+ this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
+ this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false);
+ this.internalKeyConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class);
+ this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true);
+ this.internalValueConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
+ this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."), false);
+
+ this.offsetBackingStore = offsetBackingStore;
+ this.offsetBackingStore.configure(config.originals());
+ }
+
+ public void start() {
+ log.info("Worker starting");
+
+ Map<String, Object> producerProps = new HashMap<>();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps.putAll(config.unusedConfigs());
+
+ producer = new KafkaProducer<>(producerProps);
+
+ offsetBackingStore.start();
+ sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, config);
+
+ log.info("Worker started");
+ }
+
+ public void stop() {
+ log.info("Worker stopping");
+
+ long started = time.milliseconds();
+ long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
+
+ for (Map.Entry<String, Connector> entry : connectors.entrySet()) {
+ Connector conn = entry.getValue();
+ log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before the" +
+ "Worker is stopped.", conn);
+ try {
+ conn.stop();
+ } catch (ConnectException e) {
+ log.error("Error while shutting down connector " + conn, e);
+ }
+ }
+
+ for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
+ WorkerTask task = entry.getValue();
+ log.warn("Shutting down task {} uncleanly; herder should have shut down "
+ + "tasks before the Worker is stopped.", task);
+ try {
+ task.stop();
+ } catch (ConnectException e) {
+ log.error("Error while shutting down task " + task, e);
+ }
+ }
+
+ for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
+ WorkerTask task = entry.getValue();
+ log.debug("Waiting for task {} to finish shutting down", task);
+ if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0)))
+ log.error("Graceful shutdown of task {} failed.", task);
+ task.close();
+ }
+
+ long timeoutMs = limit - time.milliseconds();
+ sourceTaskOffsetCommitter.close(timeoutMs);
+
+ offsetBackingStore.stop();
+
+ log.info("Worker stopped");
+ }
+
+ public WorkerConfig config() {
+ return config;
+ }
+
+ /**
+ * Add a new connector.
+ * @param connConfig connector configuration
+ * @param ctx context for the connector
+ */
+ public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) {
+ String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+ Class<?> maybeConnClass = connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ log.info("Creating connector {} of type {}", connName, maybeConnClass.getName());
+
+ Class<? extends Connector> connClass;
+ try {
+ connClass = maybeConnClass.asSubclass(Connector.class);
+ } catch (ClassCastException e) {
+ throw new ConnectException("Specified class is not a subclass of Connector: " + maybeConnClass.getName());
+ }
+
+ if (connectors.containsKey(connName))
+ throw new ConnectException("Connector with name " + connName + " already exists");
+
+ final Connector connector = instantiateConnector(connClass);
+ log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connClass.getName());
+ connector.initialize(ctx);
+ try {
+ connector.start(connConfig.originalsStrings());
+ } catch (ConnectException e) {
+ throw new ConnectException("Connector threw an exception while starting", e);
+ }
+
+ connectors.put(connName, connector);
+
+ log.info("Finished creating connector {}", connName);
+ }
+
+ private static Connector instantiateConnector(Class<? extends Connector> connClass) {
+ try {
+ return Utils.newInstance(connClass);
+ } catch (Throwable t) {
+ // Catches normal exceptions due to instantiation errors as well as any runtime errors that
+ // may be caused by user code
+ throw new ConnectException("Failed to create connector instance", t);
+ }
+ }
+
+ public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) {
+ log.trace("Reconfiguring connector tasks for {}", connName);
+
+ Connector connector = connectors.get(connName);
+ if (connector == null)
+ throw new ConnectException("Connector " + connName + " not found in this worker.");
+
+ List<Map<String, String>> result = new ArrayList<>();
+ String taskClassName = connector.taskClass().getName();
+ for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
+ Map<String, String> taskConfig = new HashMap<>(taskProps); // Ensure we don't modify the connector's copy of the config
+ taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
+ if (sinkTopics != null)
+ taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ","));
+ result.add(taskConfig);
+ }
+ return result;
+ }
+
+ public void stopConnector(String connName) {
+ log.info("Stopping connector {}", connName);
+
+ Connector connector = connectors.get(connName);
+ if (connector == null)
+ throw new ConnectException("Connector " + connName + " not found in this worker.");
+
+ try {
+ connector.stop();
+ } catch (ConnectException e) {
+ log.error("Error shutting down connector {}: ", connector, e);
+ }
+
+ connectors.remove(connName);
+
+ log.info("Stopped connector {}", connName);
+ }
+
+ /**
+ * Get the IDs of the connectors currently running in this worker.
+ */
+ public Set<String> connectorNames() {
+ return connectors.keySet();
+ }
+
+ /**
+ * Add a new task.
+ * @param id Globally unique ID for this task.
+ * @param taskConfig the parsed task configuration
+ */
+ public void addTask(ConnectorTaskId id, TaskConfig taskConfig) {
+ log.info("Creating task {}", id);
+
+ if (tasks.containsKey(id)) {
+ String msg = "Task already exists in this worker; the herder should not have requested "
+ + "that this : " + id;
+ log.error(msg);
+ throw new ConnectException(msg);
+ }
+
+ Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
+ final Task task = instantiateTask(taskClass);
+ log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
+
+ // Decide which type of worker task we need based on the type of task.
+ final WorkerTask workerTask;
+ if (task instanceof SourceTask) {
+ SourceTask sourceTask = (SourceTask) task;
+ OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
+ internalKeyConverter, internalValueConverter);
+ OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
+ internalKeyConverter, internalValueConverter);
+ workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
+ offsetReader, offsetWriter, config, time);
+ } else if (task instanceof SinkTask) {
+ workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time);
+ } else {
+ log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
+ throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
+ }
+
+ // Start the task before adding modifying any state, any exceptions are caught higher up the
+ // call chain and there's no cleanup to do here
+ workerTask.start(taskConfig.originalsStrings());
+ if (task instanceof SourceTask) {
+ WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
+ sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
+ }
+ tasks.put(id, workerTask);
+ }
+
+ private static Task instantiateTask(Class<? extends Task> taskClass) {
+ try {
+ return Utils.newInstance(taskClass);
+ } catch (KafkaException e) {
+ throw new ConnectException("Task class not found", e);
+ }
+ }
+
+ public void stopTask(ConnectorTaskId id) {
+ log.info("Stopping task {}", id);
+
+ WorkerTask task = getTask(id);
+ if (task instanceof WorkerSourceTask)
+ sourceTaskOffsetCommitter.remove(id);
+ task.stop();
+ if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
+ log.error("Graceful stop of task {} failed.", task);
+ task.close();
+ tasks.remove(id);
+ }
+
+ /**
+ * Get the IDs of the tasks currently running in this worker.
+ */
+ public Set<ConnectorTaskId> taskIds() {
+ return tasks.keySet();
+ }
+
+ private WorkerTask getTask(ConnectorTaskId id) {
+ WorkerTask task = tasks.get(id);
+ if (task == null) {
+ log.error("Task not found: " + id);
+ throw new ConnectException("Task not found: " + id);
+ }
+ return task;
+ }
+
+ public Converter getInternalKeyConverter() {
+ return internalKeyConverter;
+ }
+
+ public Converter getInternalValueConverter() {
+ return internalValueConverter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
new file mode 100644
index 0000000..4ecacbb
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import java.util.Map;
+
+/**
+ * Common base class providing configuration for Kafka Connect workers, whether standalone or distributed.
+ */
+@InterfaceStability.Unstable
+public class WorkerConfig extends AbstractConfig {
+
+ public static final String CLUSTER_CONFIG = "cluster";
+ private static final String CLUSTER_CONFIG_DOC =
+ "ID for this cluster, which is used to provide a namespace so multiple Kafka Connect clusters "
+ + "or instances may co-exist while sharing a single Kafka cluster.";
+ public static final String CLUSTER_DEFAULT = "connect";
+
+ public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
+ public static final String BOOTSTRAP_SERVERS_DOC
+ = "A list of host/port pairs to use for establishing the initial connection to the Kafka "
+ + "cluster. The client will make use of all servers irrespective of which servers are "
+ + "specified here for bootstrapping—this list only impacts the initial hosts used "
+ + "to discover the full set of servers. This list should be in the form "
+ + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the "
+ + "initial connection to discover the full cluster membership (which may change "
+ + "dynamically), this list need not contain the full set of servers (you may want more "
+ + "than one, though, in case a server is down).";
+ public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
+
+ public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
+ public static final String KEY_CONVERTER_CLASS_DOC =
+ "Converter class for key Connect data that implements the <code>Converter</code> interface.";
+
+ public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter";
+ public static final String VALUE_CONVERTER_CLASS_DOC =
+ "Converter class for value Connect data that implements the <code>Converter</code> interface.";
+
+ public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = "internal.key.converter";
+ public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC =
+ "Converter class for internal key Connect data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs.";
+
+ public static final String INTERNAL_VALUE_CONVERTER_CLASS_CONFIG = "internal.value.converter";
+ public static final String INTERNAL_VALUE_CONVERTER_CLASS_DOC =
+ "Converter class for offset value Connect data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs.";
+
+ public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
+ = "task.shutdown.graceful.timeout.ms";
+ private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC =
+ "Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time,"
+ + " not per task. All task have shutdown triggered, then they are waited on sequentially.";
+ private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT = "5000";
+
+ public static final String OFFSET_COMMIT_INTERVAL_MS_CONFIG = "offset.flush.interval.ms";
+ private static final String OFFSET_COMMIT_INTERVAL_MS_DOC
+ = "Interval at which to try committing offsets for tasks.";
+ public static final long OFFSET_COMMIT_INTERVAL_MS_DEFAULT = 60000L;
+
+ public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offset.flush.timeout.ms";
+ private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC
+ = "Maximum number of milliseconds to wait for records to flush and partition offset data to be"
+ + " committed to offset storage before cancelling the process and restoring the offset "
+ + "data to be committed in a future attempt.";
+ public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
+
+ public static final String REST_HOST_NAME_CONFIG = "rest.host.name";
+ private static final String REST_HOST_NAME_DOC
+ = "Hostname for the REST API. If this is set, it will only bind to this interface.";
+
+ public static final String REST_PORT_CONFIG = "rest.port";
+ private static final String REST_PORT_DOC
+ = "Port for the REST API to listen on.";
+ public static final int REST_PORT_DEFAULT = 8083;
+
+ public static final String REST_ADVERTISED_HOST_NAME_CONFIG = "rest.advertised.host.name";
+ private static final String REST_ADVERTISED_HOST_NAME_DOC
+ = "If this is set, this is the hostname that will be given out to other workers to connect to.";
+
+ public static final String REST_ADVERTISED_PORT_CONFIG = "rest.advertised.port";
+ private static final String REST_ADVERTISED_PORT_DOC
+ = "If this is set, this is the port that will be given out to other workers to connect to.";
+
+ /**
+ * Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
+ * bootstrap their own ConfigDef.
+ * @return a ConfigDef with all the common options specified
+ */
+ protected static ConfigDef baseConfigDef() {
+ return new ConfigDef()
+ .define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
+ .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
+ Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
+ .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
+ Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
+ .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
+ Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
+ .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
+ Importance.HIGH, INTERNAL_KEY_CONVERTER_CLASS_DOC)
+ .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
+ Importance.HIGH, INTERNAL_VALUE_CONVERTER_CLASS_DOC)
+ .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
+ TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
+ TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
+ .define(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, OFFSET_COMMIT_INTERVAL_MS_DEFAULT,
+ Importance.LOW, OFFSET_COMMIT_INTERVAL_MS_DOC)
+ .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, Type.LONG, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT,
+ Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC)
+ .define(REST_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_HOST_NAME_DOC)
+ .define(REST_PORT_CONFIG, Type.INT, REST_PORT_DEFAULT, Importance.LOW, REST_PORT_DOC)
+ .define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC)
+ .define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC);
+ }
+
+ public WorkerConfig(ConfigDef definition, Map<String, String> props) {
+ super(definition, props);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
new file mode 100644
index 0000000..a4d4093
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * WorkerTask that uses a SinkTask to export data from Kafka.
+ */
+class WorkerSinkTask implements WorkerTask {
+ private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
+
+ private final ConnectorTaskId id;
+ private final SinkTask task;
+ private final WorkerConfig workerConfig;
+ private final Time time;
+ private final Converter keyConverter;
+ private final Converter valueConverter;
+ private WorkerSinkTaskThread workThread;
+ private Map<String, String> taskProps;
+ private KafkaConsumer<byte[], byte[]> consumer;
+ private WorkerSinkTaskContext context;
+ private boolean started;
+ private final List<SinkRecord> messageBatch;
+ private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;
+ private Map<TopicPartition, OffsetAndMetadata> currentOffsets;
+ private boolean pausedForRedelivery;
+
+ public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
+ Converter keyConverter, Converter valueConverter, Time time) {
+ this.id = id;
+ this.task = task;
+ this.workerConfig = workerConfig;
+ this.keyConverter = keyConverter;
+ this.valueConverter = valueConverter;
+ this.time = time;
+ this.started = false;
+ this.messageBatch = new ArrayList<>();
+ this.currentOffsets = new HashMap<>();
+ this.pausedForRedelivery = false;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ taskProps = props;
+ consumer = createConsumer();
+ context = new WorkerSinkTaskContext(consumer);
+
+ workThread = createWorkerThread();
+ workThread.start();
+ }
+
+ @Override
+ public void stop() {
+ // Offset commit is handled upon exit in work thread
+ if (workThread != null)
+ workThread.startGracefulShutdown();
+ consumer.wakeup();
+ }
+
+ @Override
+ public boolean awaitStop(long timeoutMs) {
+ boolean success = true;
+ if (workThread != null) {
+ try {
+ success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
+ if (!success)
+ workThread.forceShutdown();
+ } catch (InterruptedException e) {
+ success = false;
+ }
+ }
+ task.stop();
+ return success;
+ }
+
+ @Override
+ public void close() {
+ // FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout
+ // passed in
+ if (consumer != null)
+ consumer.close();
+ }
+
+ /**
+ * Preforms initial join process for consumer group, ensures we have an assignment, and initializes + starts the
+ * SinkTask.
+ *
+ * @returns true if successful, false if joining the consumer group was interrupted
+ */
+ public boolean joinConsumerGroupAndStart() {
+ String topicsStr = taskProps.get(SinkTask.TOPICS_CONFIG);
+ if (topicsStr == null || topicsStr.isEmpty())
+ throw new ConnectException("Sink tasks require a list of topics.");
+ String[] topics = topicsStr.split(",");
+ log.debug("Task {} subscribing to topics {}", id, topics);
+ consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
+
+ // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions
+ // to work with. Any rewinding will be handled immediately when polling starts.
+ try {
+ consumer.poll(0);
+ } catch (WakeupException e) {
+ log.error("Sink task {} was stopped before completing join group. Task initialization and start is being skipped", this);
+ return false;
+ }
+ task.initialize(context);
+ task.start(taskProps);
+ log.info("Sink task {} finished initialization and start", this);
+ started = true;
+ return true;
+ }
+
+ /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
+ public void poll(long timeoutMs) {
+ try {
+ rewind();
+ long retryTimeout = context.timeout();
+ if (retryTimeout > 0) {
+ timeoutMs = Math.min(timeoutMs, retryTimeout);
+ context.timeout(-1L);
+ }
+
+ log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
+ ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
+ assert messageBatch.isEmpty() || msgs.isEmpty();
+ log.trace("{} polling returned {} messages", id, msgs.count());
+
+ convertMessages(msgs);
+ deliverMessages();
+ } catch (WakeupException we) {
+ log.trace("{} consumer woken up", id);
+ }
+ }
+
+ /**
+ * Starts an offset commit by flushing outstanding messages from the task and then starting
+ * the write commit. This should only be invoked by the WorkerSinkTaskThread.
+ **/
+ public void commitOffsets(boolean sync, final int seqno) {
+ log.info("{} Committing offsets", this);
+
+ final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets);
+
+ try {
+ task.flush(offsets);
+ } catch (Throwable t) {
+ log.error("Commit of {} offsets failed due to exception while flushing: {}", this, t);
+ log.error("Rewinding offsets to last committed offsets");
+ for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) {
+ log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset());
+ consumer.seek(entry.getKey(), entry.getValue().offset());
+ }
+ currentOffsets = new HashMap<>(lastCommittedOffsets);
+ workThread.onCommitCompleted(t, seqno);
+ return;
+ }
+
+ if (sync) {
+ try {
+ consumer.commitSync(offsets);
+ lastCommittedOffsets = offsets;
+ workThread.onCommitCompleted(null, seqno);
+ } catch (KafkaException e) {
+ workThread.onCommitCompleted(e, seqno);
+ }
+ } else {
+ OffsetCommitCallback cb = new OffsetCommitCallback() {
+ @Override
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) {
+ lastCommittedOffsets = offsets;
+ workThread.onCommitCompleted(error, seqno);
+ }
+ };
+ consumer.commitAsync(offsets, cb);
+ }
+ }
+
+ public Time time() {
+ return time;
+ }
+
+ public WorkerConfig workerConfig() {
+ return workerConfig;
+ }
+
+ private KafkaConsumer<byte[], byte[]> createConsumer() {
+ // Include any unknown worker configs so consumer configs can be set globally on the worker
+ // and through to the task
+ Map<String, Object> props = workerConfig.unusedConfigs();
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "connect-" + id.connector());
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+ KafkaConsumer<byte[], byte[]> newConsumer;
+ try {
+ newConsumer = new KafkaConsumer<>(props);
+ } catch (Throwable t) {
+ throw new ConnectException("Failed to create consumer", t);
+ }
+
+ return newConsumer;
+ }
+
+ private WorkerSinkTaskThread createWorkerThread() {
+ return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
+ }
+
+ private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
+ for (ConsumerRecord<byte[], byte[]> msg : msgs) {
+ log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
+ SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
+ SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
+ messageBatch.add(
+ new SinkRecord(msg.topic(), msg.partition(),
+ keyAndSchema.schema(), keyAndSchema.value(),
+ valueAndSchema.schema(), valueAndSchema.value(),
+ msg.offset())
+ );
+ }
+ }
+
+ private void deliverMessages() {
+ // Finally, deliver this batch to the sink
+ try {
+ // Since we reuse the messageBatch buffer, ensure we give the task its own copy
+ task.put(new ArrayList<>(messageBatch));
+ for (SinkRecord record : messageBatch)
+ currentOffsets.put(new TopicPartition(record.topic(), record.kafkaPartition()),
+ new OffsetAndMetadata(record.kafkaOffset() + 1));
+ messageBatch.clear();
+ // If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
+ // the task had not explicitly paused
+ if (pausedForRedelivery) {
+ for (TopicPartition tp : consumer.assignment())
+ if (!context.pausedPartitions().contains(tp))
+ consumer.resume(tp);
+ pausedForRedelivery = false;
+ }
+ } catch (RetriableException e) {
+ log.error("RetriableException from SinkTask {}: {}", id, e);
+ // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
+ // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
+ pausedForRedelivery = true;
+ for (TopicPartition tp : consumer.assignment())
+ consumer.pause(tp);
+ // Let this exit normally, the batch will be reprocessed on the next loop.
+ } catch (Throwable t) {
+ log.error("Task {} threw an uncaught and unrecoverable exception", id);
+ log.error("Task is being killed and will not recover until manually restarted:", t);
+ throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception.");
+ }
+ }
+
+ private void rewind() {
+ Map<TopicPartition, Long> offsets = context.offsets();
+ if (offsets.isEmpty()) {
+ return;
+ }
+ for (TopicPartition tp: offsets.keySet()) {
+ Long offset = offsets.get(tp);
+ if (offset != null) {
+ log.trace("Rewind {} to offset {}.", tp, offset);
+ consumer.seek(tp, offset);
+ lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset));
+ currentOffsets.put(tp, new OffsetAndMetadata(offset));
+ }
+ }
+ context.clearOffsets();
+ }
+
+ private class HandleRebalance implements ConsumerRebalanceListener {
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ lastCommittedOffsets = new HashMap<>();
+ currentOffsets = new HashMap<>();
+ for (TopicPartition tp : partitions) {
+ long pos = consumer.position(tp);
+ lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
+ currentOffsets.put(tp, new OffsetAndMetadata(pos));
+ log.debug("{} assigned topic partition {} with offset {}", id, tp, pos);
+ }
+
+ // If we paused everything for redelivery (which is no longer relevant since we discarded the data), make
+ // sure anything we paused that the task didn't request to be paused *and* which we still own is resumed.
+ // Also make sure our tracking of paused partitions is updated to remove any partitions we no longer own.
+ if (pausedForRedelivery) {
+ pausedForRedelivery = false;
+ Set<TopicPartition> assigned = new HashSet<>(partitions);
+ Set<TopicPartition> taskPaused = context.pausedPartitions();
+
+ for (TopicPartition tp : partitions) {
+ if (!taskPaused.contains(tp))
+ consumer.resume(tp);
+ }
+
+ Iterator<TopicPartition> tpIter = taskPaused.iterator();
+ while (tpIter.hasNext()) {
+ TopicPartition tp = tpIter.next();
+ if (assigned.contains(tp))
+ tpIter.remove();
+ }
+ }
+
+ // Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon
+ // task start. Since this callback gets invoked during that initial setup before we've started the task, we
+ // need to guard against invoking the user's callback method during that period.
+ if (started)
+ task.onPartitionsAssigned(partitions);
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ task.onPartitionsRevoked(partitions);
+ commitOffsets(true, -1);
+ // Make sure we don't have any leftover data since offsets will be reset to committed positions
+ messageBatch.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
new file mode 100644
index 0000000..06f4838
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at <p/> http://www.apache.org/licenses/LICENSE-2.0 <p/> Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.IllegalWorkerStateException;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class WorkerSinkTaskContext implements SinkTaskContext {
+ private Map<TopicPartition, Long> offsets;
+ private long timeoutMs;
+ private KafkaConsumer<byte[], byte[]> consumer;
+ private final Set<TopicPartition> pausedPartitions;
+
+ public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
+ this.offsets = new HashMap<>();
+ this.timeoutMs = -1L;
+ this.consumer = consumer;
+ this.pausedPartitions = new HashSet<>();
+ }
+
+ @Override
+ public void offset(Map<TopicPartition, Long> offsets) {
+ this.offsets.putAll(offsets);
+ }
+
+ @Override
+ public void offset(TopicPartition tp, long offset) {
+ offsets.put(tp, offset);
+ }
+
+ public void clearOffsets() {
+ offsets.clear();
+ }
+
+ /**
+ * Get offsets that the SinkTask has submitted to be reset. Used by the Kafka Connect framework.
+ * @return the map of offsets
+ */
+ public Map<TopicPartition, Long> offsets() {
+ return offsets;
+ }
+
+ @Override
+ public void timeout(long timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ }
+
+ /**
+ * Get the timeout in milliseconds set by SinkTasks. Used by the Kafka Connect framework.
+ * @return the backoff timeout in milliseconds.
+ */
+ public long timeout() {
+ return timeoutMs;
+ }
+
+ @Override
+ public Set<TopicPartition> assignment() {
+ if (consumer == null) {
+ throw new IllegalWorkerStateException("SinkTaskContext may not be used to look up partition assignment until the task is initialized");
+ }
+ return consumer.assignment();
+ }
+
+ @Override
+ public void pause(TopicPartition... partitions) {
+ if (consumer == null) {
+ throw new IllegalWorkerStateException("SinkTaskContext may not be used to pause consumption until the task is initialized");
+ }
+ try {
+ for (TopicPartition partition : partitions)
+ pausedPartitions.add(partition);
+ consumer.pause(partitions);
+ } catch (IllegalStateException e) {
+ throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e);
+ }
+ }
+
+ @Override
+ public void resume(TopicPartition... partitions) {
+ if (consumer == null) {
+ throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized");
+ }
+ try {
+ for (TopicPartition partition : partitions)
+ pausedPartitions.remove(partition);
+ consumer.resume(partitions);
+ } catch (IllegalStateException e) {
+ throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e);
+ }
+ }
+
+ public Set<TopicPartition> pausedPartitions() {
+ return pausedPartitions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
new file mode 100644
index 0000000..e776f08
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Worker thread for a WorkerSinkTask. These classes are very tightly coupled, but separated to
+ * simplify testing.
+ */
+class WorkerSinkTaskThread extends ShutdownableThread {
+ private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
+
+ private final WorkerSinkTask task;
+ private long nextCommit;
+ private boolean committing;
+ private int commitSeqno;
+ private long commitStarted;
+ private int commitFailures;
+
+ public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time,
+ WorkerConfig workerConfig) {
+ super(name);
+ this.task = task;
+ this.nextCommit = time.milliseconds() +
+ workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+ this.committing = false;
+ this.commitSeqno = 0;
+ this.commitStarted = -1;
+ this.commitFailures = 0;
+ }
+
+ @Override
+ public void execute() {
+ // Try to join and start. If we're interrupted before this completes, bail.
+ if (!task.joinConsumerGroupAndStart())
+ return;
+
+ while (getRunning()) {
+ iteration();
+ }
+
+ // Make sure any uncommitted data has committed
+ task.commitOffsets(true, -1);
+ }
+
+ public void iteration() {
+ long now = task.time().milliseconds();
+
+ // Maybe commit
+ if (!committing && now >= nextCommit) {
+ synchronized (this) {
+ committing = true;
+ commitSeqno += 1;
+ commitStarted = now;
+ }
+ task.commitOffsets(false, commitSeqno);
+ nextCommit += task.workerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+ }
+
+ // Check for timed out commits
+ long commitTimeout = commitStarted + task.workerConfig().getLong(
+ WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+ if (committing && now >= commitTimeout) {
+ log.warn("Commit of {} offsets timed out", this);
+ commitFailures++;
+ committing = false;
+ }
+
+ // And process messages
+ long timeoutMs = Math.max(nextCommit - now, 0);
+ task.poll(timeoutMs);
+ }
+
+ public void onCommitCompleted(Throwable error, long seqno) {
+ synchronized (this) {
+ if (commitSeqno != seqno) {
+ log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}",
+ this,
+ seqno, commitSeqno);
+ } else {
+ if (error != null) {
+ log.error("Commit of {} offsets threw an unexpected exception: ", this, error);
+ commitFailures++;
+ } else {
+ log.debug("Finished {} offset commit successfully in {} ms",
+ this, task.time().milliseconds() - commitStarted);
+ commitFailures = 0;
+ }
+ committing = false;
+ }
+ }
+ }
+
+ public int commitFailures() {
+ return commitFailures;
+ }
+}