You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/05/18 17:41:28 UTC
[kafka] branch 0.11.0 updated: KAFKA-6566: Improve Connect Resource
Cleanup
This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.11.0 by this push:
new 9e4d504 KAFKA-6566: Improve Connect Resource Cleanup
9e4d504 is described below
commit 9e4d50472205ae1f56db7b1a0a2cb5f60d2ab5e4
Author: Robert Yokota <ra...@gmail.com>
AuthorDate: Fri May 18 10:39:34 2018 -0700
KAFKA-6566: Improve Connect Resource Cleanup
This is a change to improve resource cleanup for sink tasks and source tasks. Now `Task.stop()` is called from both `WorkerSinkTask.close()` and `WorkerSourceTask.close()`.
It is called from `WorkerXXXTask.close()` since this method is called in the `finally` block of `WorkerTask.run()`, and Connect developers use `stop()` to clean up resources.
Author: Robert Yokota <ra...@gmail.com>
Reviewers: Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #5020 from rayokota/K6566-improve-connect-resource-cleanup
(cherry picked from commit ee8abb2f7053575bd2abec8152907e0642b1d713)
Signed-off-by: Ewen Cheslack-Postava <me...@ewencp.org>
---
.../kafka/connect/runtime/WorkerSinkTask.java | 21 +++++++++++---
.../kafka/connect/runtime/WorkerSourceTask.java | 33 +++++++++++++++++++---
2 files changed, 46 insertions(+), 8 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 0132c66..ee3fa6f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -130,10 +130,23 @@ class WorkerSinkTask extends WorkerTask {
protected void close() {
// FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout
// passed in
- task.stop();
- if (consumer != null)
- consumer.close();
- transformationChain.close();
+ try {
+ task.stop();
+ } catch (Throwable t) {
+ log.warn("Could not stop task", t);
+ }
+ if (consumer != null) {
+ try {
+ consumer.close();
+ } catch (Throwable t) {
+ log.warn("Could not close consumer", t);
+ }
+ }
+ try {
+ transformationChain.close();
+ } catch (Throwable t) {
+ log.warn("Could not close transformation chain", t);
+ }
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 163c7d0..f54365b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -74,6 +74,7 @@ class WorkerSourceTask extends WorkerTask {
private Map<String, String> taskConfig;
private boolean finishedStart = false;
private boolean startedShutdownBeforeStartCompleted = false;
+ private boolean stopped = false;
public WorkerSourceTask(ConnectorTaskId id,
SourceTask task,
@@ -119,8 +120,21 @@ class WorkerSourceTask extends WorkerTask {
}
protected void close() {
- producer.close(30, TimeUnit.SECONDS);
- transformationChain.close();
+ if (!shouldPause()) {
+ tryStop();
+ }
+ if (producer != null) {
+ try {
+ producer.close(30, TimeUnit.SECONDS);
+ } catch (Throwable t) {
+ log.warn("Could not close producer", t);
+ }
+ }
+ try {
+ transformationChain.close();
+ } catch (Throwable t) {
+ log.warn("Could not close transformation chain", t);
+ }
}
@Override
@@ -129,12 +143,23 @@ class WorkerSourceTask extends WorkerTask {
stopRequestedLatch.countDown();
synchronized (this) {
if (finishedStart)
- task.stop();
+ tryStop();
else
startedShutdownBeforeStartCompleted = true;
}
}
+ private synchronized void tryStop() {
+ if (!stopped) {
+ try {
+ task.stop();
+ stopped = true;
+ } catch (Throwable t) {
+ log.warn("Could not stop task", t);
+ }
+ }
+ }
+
@Override
public void execute() {
try {
@@ -143,7 +168,7 @@ class WorkerSourceTask extends WorkerTask {
log.info("Source task {} finished initialization and start", this);
synchronized (this) {
if (startedShutdownBeforeStartCompleted) {
- task.stop();
+ tryStop();
return;
}
finishedStart = true;
--
To stop receiving notification emails like this one, please contact
ewencp@apache.org.