You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/12/24 22:23:50 UTC
kafka git commit: KAFKA-4527: task status was being updated before
actual pause/resume
Repository: kafka
Updated Branches:
refs/heads/trunk 9eb665c39 -> d09214624
KAFKA-4527: task status was being updated before actual pause/resume
h/t ewencp for pointing out the issue
Author: Shikhar Bhushan <sh...@confluent.io>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #2277 from shikhar/kafka-4527
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0921462
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0921462
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0921462
Branch: refs/heads/trunk
Commit: d092146247f3b3de061ecdb4ddfeac9321d8cf73
Parents: 9eb665c
Author: Shikhar Bhushan <sh...@confluent.io>
Authored: Sat Dec 24 17:25:01 2016 -0500
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sat Dec 24 17:25:01 2016 -0500
----------------------------------------------------------------------
.../kafka/connect/runtime/WorkerSinkTask.java | 3 +
.../kafka/connect/runtime/WorkerSourceTask.java | 5 +-
.../kafka/connect/runtime/WorkerTask.java | 71 ++++++++++----------
.../connect/runtime/WorkerSinkTaskTest.java | 7 +-
.../connect/runtime/WorkerSourceTaskTest.java | 20 ++----
5 files changed, 53 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0921462/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index b941469..a284ec7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -181,8 +181,11 @@ class WorkerSinkTask extends WorkerTask {
if (shouldPause()) {
pauseAll();
+ onPause();
+ context.requestCommit();
} else if (!pausedForRedelivery) {
resumeAll();
+ onResume();
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0921462/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 260015e..05a07b8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -146,7 +146,10 @@ class WorkerSourceTask extends WorkerTask {
while (!isStopping()) {
if (shouldPause()) {
- awaitUnpause();
+ onPause();
+ if (awaitUnpause()) {
+ onResume();
+ }
continue;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0921462/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 2f2ebb5..04fb333 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -23,8 +23,6 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
/**
* Handles processing for an individual task. This interface only provides the basic methods
@@ -41,21 +39,20 @@ abstract class WorkerTask implements Runnable {
private static final Logger log = LoggerFactory.getLogger(WorkerTask.class);
protected final ConnectorTaskId id;
- private final AtomicBoolean stopping; // indicates whether the Worker has asked the task to stop
- private final AtomicBoolean cancelled; // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)
- private final CountDownLatch shutdownLatch;
private final TaskStatus.Listener statusListener;
- private final AtomicReference<TargetState> targetState;
+ private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+ private volatile TargetState targetState;
+ private volatile boolean stopping; // indicates whether the Worker has asked the task to stop
+ private volatile boolean cancelled; // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)
public WorkerTask(ConnectorTaskId id,
TaskStatus.Listener statusListener,
TargetState initialState) {
this.id = id;
- this.stopping = new AtomicBoolean(false);
- this.cancelled = new AtomicBoolean(false);
- this.shutdownLatch = new CountDownLatch(1);
this.statusListener = statusListener;
- this.targetState = new AtomicReference<>(initialState);
+ this.targetState = initialState;
+ this.stopping = false;
+ this.cancelled = false;
}
public ConnectorTaskId id() {
@@ -71,7 +68,7 @@ abstract class WorkerTask implements Runnable {
private void triggerStop() {
synchronized (this) {
- this.stopping.set(true);
+ stopping = true;
// wakeup any threads that are waiting for unpause
this.notifyAll();
@@ -91,7 +88,7 @@ abstract class WorkerTask implements Runnable {
* updated when it eventually does shutdown.
*/
public void cancel() {
- this.cancelled.set(true);
+ cancelled = true;
}
/**
@@ -113,7 +110,7 @@ abstract class WorkerTask implements Runnable {
protected abstract void close();
protected boolean isStopping() {
- return stopping.get();
+ return stopping;
}
private void doClose() {
@@ -125,16 +122,18 @@ abstract class WorkerTask implements Runnable {
}
}
- private void doRun() {
+ private void doRun() throws InterruptedException {
try {
synchronized (this) {
- if (stopping.get())
+ if (stopping)
return;
- if (targetState.get() == TargetState.PAUSED)
- statusListener.onPause(id);
- else
- statusListener.onStartup(id);
+ if (targetState == TargetState.PAUSED) {
+ onPause();
+ if (!awaitUnpause()) return;
+ }
+
+ statusListener.onStartup(id);
}
execute();
@@ -153,7 +152,7 @@ abstract class WorkerTask implements Runnable {
// if we were cancelled, skip the status update since the task may have already been
// started somewhere else
- if (!cancelled.get())
+ if (!cancelled)
statusListener.onShutdown(id);
}
}
@@ -164,11 +163,19 @@ abstract class WorkerTask implements Runnable {
// if we were cancelled, skip the status update since the task may have already been
// started somewhere else
- if (!cancelled.get())
+ if (!cancelled)
statusListener.onFailure(id, t);
}
}
+ protected synchronized void onPause() {
+ statusListener.onPause(id);
+ }
+
+ protected synchronized void onResume() {
+ statusListener.onResume(id);
+ }
+
@Override
public void run() {
try {
@@ -178,14 +185,14 @@ abstract class WorkerTask implements Runnable {
onFailure(t);
if (t instanceof Error)
- throw t;
+ throw (Error) t;
} finally {
shutdownLatch.countDown();
}
}
public boolean shouldPause() {
- return this.targetState.get() == TargetState.PAUSED;
+ return this.targetState == TargetState.PAUSED;
}
/**
@@ -195,8 +202,8 @@ abstract class WorkerTask implements Runnable {
*/
protected boolean awaitUnpause() throws InterruptedException {
synchronized (this) {
- while (targetState.get() == TargetState.PAUSED) {
- if (stopping.get())
+ while (targetState == TargetState.PAUSED) {
+ if (stopping)
return false;
this.wait();
}
@@ -207,19 +214,11 @@ abstract class WorkerTask implements Runnable {
public void transitionTo(TargetState state) {
synchronized (this) {
// ignore the state change if we are stopping
- if (stopping.get())
+ if (stopping)
return;
- TargetState oldState = this.targetState.getAndSet(state);
- if (state != oldState) {
- if (state == TargetState.PAUSED) {
- statusListener.onPause(id);
- } else if (state == TargetState.STARTED) {
- statusListener.onResume(id);
- this.notifyAll();
- } else
- throw new IllegalArgumentException("Unhandled target state " + state);
- }
+ this.targetState = state;
+ this.notifyAll();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0921462/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 1f9e56b..d4427d1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -178,7 +178,9 @@ public class WorkerSinkTaskTest {
consumer.pause(partitions);
PowerMock.expectLastCall();
- // No records returned
+ // Offset commit as requested when pausing; No records returned by consumer.poll()
+ sinkTask.preCommit(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
+ EasyMock.expectLastCall().andStubReturn(Collections.emptyMap());
expectConsumerPoll(0);
sinkTask.put(Collections.<SinkRecord>emptyList());
EasyMock.expectLastCall();
@@ -353,6 +355,9 @@ public class WorkerSinkTaskTest {
consumer.resume(Collections.singleton(TOPIC_PARTITION2));
EasyMock.expectLastCall();
+ statusListener.onResume(taskId);
+ EasyMock.expectLastCall();
+
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0921462/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 86f5797..71b315f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -130,41 +130,31 @@ public class WorkerSourceTaskTest extends ThreadedTest {
@Test
public void testStartPaused() throws Exception {
- final CountDownLatch startupLatch = new CountDownLatch(1);
+ final CountDownLatch pauseLatch = new CountDownLatch(1);
createWorkerTask(TargetState.PAUSED);
- sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
- EasyMock.expectLastCall();
- sourceTask.start(TASK_PROPS);
+ statusListener.onPause(taskId);
EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
@Override
public Void answer() throws Throwable {
- startupLatch.countDown();
+ pauseLatch.countDown();
return null;
}
});
- statusListener.onPause(taskId);
- EasyMock.expectLastCall();
- // we shouldn't get any calls to poll()
-
- sourceTask.stop();
+ producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
EasyMock.expectLastCall();
- expectOffsetFlush(true);
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
- producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
- EasyMock.expectLastCall();
-
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
Future<?> taskFuture = executor.submit(workerTask);
- assertTrue(startupLatch.await(5, TimeUnit.SECONDS));
+ assertTrue(pauseLatch.await(5, TimeUnit.SECONDS));
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));